In [1]:
# import os, sys
# os.environ["PYSPARK_PYTHON"] = sys.executable
# os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
# Importing Necessary modules
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, count
from pyspark.sql.functions import first, explode
from pyspark.sql.functions import round, when, dayofweek
from pyspark.sql.functions import col
from pyspark.sql.functions import split, udf, concat_ws
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.types import ArrayType, StringType, IntegerType,  StructType, \
                    StructField, DoubleType

import sys
from pathlib import Path
sys.path.append(str(Path.cwd().parent))
from src import constants

In [3]:
spark = SparkSession.builder.appName("appName").config("spark.driver.extraClassPath", "C:\\mysql-connector-j-8.0.33.jar").getOrCreate()

# 1. Load Data

## Q1) 1.1 Load data from MySQL

In [4]:
def load_data_from_mysql(spark :pyspark.sql.SparkSession, db_name :str, table_name :str) \
                        -> pyspark.sql.DataFrame:
    '''
        – load data from MySQL table 'table_name' from database 'db_name' and return the 
          table as a spark dataframe

        For the mysql connection use below information:
            jdbc driver: 'com.mysql.cj.jdbc.Driver'
            Hostname: 'localhost'
            Port: 3306
            Database: 'classicmodels'
            Table_name: 'order_details'
            Username: 'mysql_user'
            Password: 'user'
    '''

    spark_df = spark.read \
        .format("jdbc") \
        .option("driver","com.mysql.cj.jdbc.Driver") \
        .option("url", "jdbc:mysql://localhost:3306/"+db_name) \
        .option("dbtable", table_name) \
        .option("user", "mysql_user") \
        .option("password", "user") \
        .load()
    # print(spark_df.show(truncate=False))
    return spark_df


In [5]:
for row in load_data_from_mysql(spark, "classicmodels", 'orderdetails').collect():
    print(row)

Row(orderNumber=10100, productCode='S18_1749', quantityOrdered=30, priceEach=Decimal('136.00'), orderLineNumber=3)
Row(orderNumber=10100, productCode='S18_2248', quantityOrdered=50, priceEach=Decimal('55.09'), orderLineNumber=2)
Row(orderNumber=10100, productCode='S18_4409', quantityOrdered=22, priceEach=Decimal('75.46'), orderLineNumber=4)
Row(orderNumber=10100, productCode='S24_3969', quantityOrdered=49, priceEach=Decimal('35.29'), orderLineNumber=1)
Row(orderNumber=10101, productCode='S18_2325', quantityOrdered=25, priceEach=Decimal('108.06'), orderLineNumber=4)
Row(orderNumber=10101, productCode='S18_2795', quantityOrdered=26, priceEach=Decimal('167.06'), orderLineNumber=1)
Row(orderNumber=10101, productCode='S24_1937', quantityOrdered=45, priceEach=Decimal('32.53'), orderLineNumber=3)
Row(orderNumber=10101, productCode='S24_2022', quantityOrdered=46, priceEach=Decimal('44.35'), orderLineNumber=2)
Row(orderNumber=10102, productCode='S18_1342', quantityOrdered=39, priceEach=Decimal(

## Q2) 1.2  Load data from csv

In [6]:
def load_data_from_csv(spark :pyspark.sql.SparkSession, csv_file_name: str) \
                      -> pyspark.sql.DataFrame:
    '''
    Load data from CSV file 'csv_file_name' and return a spark Dataframe
    PS: The data files for this assignment are in 'data' folder
    You can access full path of 'data/' folder using 'DATA_FOLDER' variable
    from constants.py
    ''' 
    return spark.read.csv(constants.DATA_FOLDER + csv_file_name, header=True, inferSchema=True)

In [7]:
load_data_from_csv(spark, "customers.csv").show(truncate=False)

+--------------+----------------------------+---------------+----------------+-----------------+----------------------------+------------------------+-------------+--------+----------+---------+----------------------+-----------+
|customerNumber|customerName                |contactLastName|contactFirstName|phone            |addressLine1                |addressLine2            |city         |state   |postalCode|country  |salesRepEmployeeNumber|creditLimit|
+--------------+----------------------------+---------------+----------------+-----------------+----------------------------+------------------------+-------------+--------+----------+---------+----------------------+-----------+
|103           |Atelier graphique           |Schmitt        |Carine          |40.32.2555       |54, rue Royale              |NULL                    |Nantes       |NULL    |44000     |France   |1370                  |21000.0    |
|112           |Signal Gift Stores          |King           |Jean            |70

## Q3) 1.3  Load data from flat file

In [8]:
def load_data_from_flatfile(spark :pyspark.sql.SparkSession, txt_file_name: str) \
                      -> pyspark.sql.DataFrame:
    '''
    Load data from flat file 'txt_file_name' separated with ':' and return a spark Dataframe
    PS: The data files for this assignment are in 'data' folder
    You can access full path of 'data/' folder using 'DATA_FOLDER' variable
    from constants.py
    '''
    return spark.read.csv(constants.DATA_FOLDER + txt_file_name, sep=':', header=True, inferSchema=True)

In [9]:
load_data_from_flatfile(spark, "payments.txt").show(truncate=False)

+--------------+-----------+-----------+---------+
|customerNumber|checkNumber|paymentDate|amount   |
+--------------+-----------+-----------+---------+
|103           |HQ336336   |2004-10-19 |6066.78  |
|103           |JM555205   |2003-06-05 |14571.44 |
|103           |OM314933   |2004-12-18 |1676.14  |
|112           |BO864823   |2004-12-17 |14191.12 |
|112           |HQ55022    |2003-06-06 |32641.98 |
|112           |ND748579   |2004-08-20 |33347.88 |
|114           |GG31455    |2003-05-20 |45864.03 |
|114           |MA765515   |2004-12-15 |82261.22 |
|114           |NP603840   |2003-05-31 |7565.08  |
|114           |NR27552    |2004-03-10 |44894.74 |
|119           |DB933704   |2004-11-14 |19501.82 |
|119           |LN373447   |2004-08-08 |47924.19 |
|119           |NG94694    |2005-02-22 |49523.67 |
|121           |DB889831   |2003-02-16 |50218.95 |
|121           |FD317790   |2003-10-28 |1491.38  |
|121           |KI831359   |2004-11-04 |17876.32 |
|121           |MA302151   |200

In [10]:
'''customers = load_data_from_csv(spark, "customers.csv") 
employees = load_data_from_csv(spark, "employees.csv")
offices = load_data_from_csv(spark, "offices.csv") 
orderDetails = load_data_from_mysql(spark, "classicmodels", 'orderdetails')   
orders = load_data_from_csv(spark, "orders.csv")
payments = load_data_from_flatfile(spark, "payments.txt") 
productLines = load_data_from_csv(spark, "productlines.csv")
products = load_data_from_csv(spark, "products.csv")

customers = customers.withColumn("creditLimit", col("creditLimit").cast(IntegerType()))
orders = orders.withColumn("shippedDate", to_date(col("shippedDate"),"yyyy-MM-dd"))'''

'customers = load_data_from_csv(spark, "customers.csv") \nemployees = load_data_from_csv(spark, "employees.csv")\noffices = load_data_from_csv(spark, "offices.csv") \norderDetails = load_data_from_mysql(spark, "classicmodels", \'orderdetails\')   \norders = load_data_from_csv(spark, "orders.csv")\npayments = load_data_from_flatfile(spark, "payments.txt") \nproductLines = load_data_from_csv(spark, "productlines.csv")\nproducts = load_data_from_csv(spark, "products.csv")\n\ncustomers = customers.withColumn("creditLimit", col("creditLimit").cast(IntegerType()))\norders = orders.withColumn("shippedDate", to_date(col("shippedDate"),"yyyy-MM-dd"))'

In [11]:
'''load_data_from_csv(spark, "customers.csv").createOrReplaceTempView("customers_sql")
load_data_from_csv(spark, "employees.csv").createOrReplaceTempView("employees_sql")
load_data_from_csv(spark, "offices.csv").createOrReplaceTempView("offices_sql")
load_data_from_mysql(spark, "classicmodels", 'orderdetails').createOrReplaceTempView("orderdetails_sql")
load_data_from_csv(spark, "orders.csv").createOrReplaceTempView("orders_sql")
load_data_from_flatfile(spark, "payments.txt").createOrReplaceTempView("payments_sql")
load_data_from_csv(spark, "productlines.csv").createOrReplaceTempView("productlines_sql")
load_data_from_csv(spark, "products.csv").createOrReplaceTempView("products_sql")'''

'load_data_from_csv(spark, "customers.csv").createOrReplaceTempView("customers_sql")\nload_data_from_csv(spark, "employees.csv").createOrReplaceTempView("employees_sql")\nload_data_from_csv(spark, "offices.csv").createOrReplaceTempView("offices_sql")\nload_data_from_mysql(spark, "classicmodels", \'orderdetails\').createOrReplaceTempView("orderdetails_sql")\nload_data_from_csv(spark, "orders.csv").createOrReplaceTempView("orders_sql")\nload_data_from_flatfile(spark, "payments.txt").createOrReplaceTempView("payments_sql")\nload_data_from_csv(spark, "productlines.csv").createOrReplaceTempView("productlines_sql")\nload_data_from_csv(spark, "products.csv").createOrReplaceTempView("products_sql")'

# 2. Transformations

## Q4) 2.1  Clean product's MSRP data

In [12]:
def clean_product_MSRP_column(spark :pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    '''
    Due to a data entry issues MSRP, the selling price is lower than its buyPrice for some
    products. Change MSRP to 1.4 times of the buyPrice for such products and cast it to two
    decimal places.
    
    Return a spark dataframe with following columns.
    |productCode|productName|productLine|productScale|productVendor|productDescription|quantityInStock|buyPrice|MSRP|
    '''

    products = load_data_from_csv(spark, "products.csv")
    products = products.withColumn("MSRP", when(col("MSRP") < col("buyPrice"), round(1.4*col("buyPrice"), 2)) \
               .otherwise(col("MSRP")))
    return products


In [13]:
products = clean_product_MSRP_column(spark)

## Q5) 2.2  Apply explode on data

In [14]:
def explode_productLines_description(spark :pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    ''' 
    Split the productLines description into words and explode the it.

    Return a spark dataframe with following columns.
    |productLine|description|
    +-----------+------------+
    |Motorcycles|         Our|
    |Motorcycles| motorcycles|
    |Motorcycles|         are|
       ... so on

    '''
    pLines = load_data_from_csv(spark, "productlines.csv")
    pLines = pLines.withColumn("textDescription", split(col("textDescription"), " "))
    return pLines.select(pLines.productLine, explode(pLines.textDescription).alias('description'))

In [15]:
explode_productLines_description(spark).show()

+------------+------------+
| productLine| description|
+------------+------------+
|Classic Cars|   Attention|
|Classic Cars|         car|
|Classic Cars|enthusiasts:|
|Classic Cars|        Make|
|Classic Cars|        your|
|Classic Cars|     wildest|
|Classic Cars|         car|
|Classic Cars|   ownership|
|Classic Cars|      dreams|
|Classic Cars|        come|
|Classic Cars|       true.|
|Classic Cars|     Whether|
|Classic Cars|         you|
|Classic Cars|         are|
|Classic Cars|     looking|
|Classic Cars|         for|
|Classic Cars|     classic|
|Classic Cars|      muscle|
|Classic Cars|       cars,|
|Classic Cars|       dream|
+------------+------------+
only showing top 20 rows



## Q6) 2.3  Struct

In [16]:

def get_customer_info(spark :pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    '''
    Return a consolidated customer info using structs.
    
    Return a spark dataframe with following columns.
    |custID|custName|country|#orders|totalMoneySpent|creditLimit|
    '''
    customer_schema = StructType([ \
      StructField("custID",StringType(),True), \
      StructField("custName",StringType(),True), \
      StructField("country",StringType(),True), \
      StructField("#orders",IntegerType(),True), \
      StructField("totalMoneySpent", DoubleType(), True), \
      StructField("creditLimit", IntegerType(), True), \
    ])

    orders = load_data_from_csv(spark, "orders.csv")
    customers = load_data_from_csv(spark, "customers.csv")
    payments = load_data_from_flatfile(spark, "payments.txt")
    oc = customers.join(orders, 'customerNumber', 'left').drop(orders.customerNumber).groupBy('customerNumber').count()
    pc = customers.join(payments, 'customerNumber', 'left').drop(payments.customerNumber).groupBy('customerNumber').agg(sum('amount'))
    customer_details = pc.join(oc, 'customerNumber').join(customers, 'customerNumber').select('customerNumber', 'customerName', \
                       'country', 'count', col('sum(amount)').alias('totalAmount'), 'creditLimit')
    
    return customer_details.toDF(*customer_schema.fieldNames())

In [17]:
get_customer_info(spark).show()

+------+--------------------+-----------+-------+------------------+-----------+
|custID|            custName|    country|#orders|   totalMoneySpent|creditLimit|
+------+--------------------+-----------+-------+------------------+-----------+
|   148|Dragon Souveniers...|  Singapore|      5|         156251.03|   103800.0|
|   471|Australian Collec...|  Australia|      3|44920.759999999995|    60300.0|
|   496|   Kelly's Gift Shop|New Zealand|      4|         114497.19|   110000.0|
|   458|Corrida Auto Repl...|      Spain|      3|         112440.09|   104600.0|
|   481|  Raanan Stores, Inc|     Israel|      1|              null|        0.0|
|   321|Corporate Gift Id...|        USA|      4|         132340.78|   105000.0|
|   362|   Gifts4AllAges.com|        USA|      3|          33533.47|    41900.0|
|   211|King Kong Collect...|  Hong Kong|      2|          45480.79|    58600.0|
|   385|     Cruz & Sons Co.|Philippines|      3|           87468.3|    81500.0|
|   406|   Auto Canal+ Petit

## Q7) 2.4  UDF

In [18]:
@udf
def categorize_products(buy_price):
    if buy_price < 40:
        return "Affordable"
    elif buy_price < 80:
        return "Moderate"
    else:
        return "Expensive"
    
def retrun_product_category(spark :pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    ''' 
    Add a new column 'productCategory' using above udf and return the updated dataframe
    Note: Please do not change original dataframe.

    Return a spark dataframe with following columns.
    |productCode|productName|productLine|productScale|productVendor|productDescription \
    |quantityInStock|buyPrice|MSRP|productCategory|
    '''
    products = load_data_from_csv(spark, "products.csv")
    products = products.withColumn('buyPrice', when(col('buyPrice').cast("integer").isNotNull(), \
                                col('buyPrice').cast("integer")).otherwise(0))
    return products.withColumn("productCategory", categorize_products(products["buyPrice"]))


In [19]:
retrun_product_category(spark).show()

+-----------+--------------------+----------------+------------+--------------------+--------------------+---------------+--------+------+---------------+
|productCode|         productName|     productLine|productScale|       productVendor|  productDescription|quantityInStock|buyPrice|  MSRP|productCategory|
+-----------+--------------------+----------------+------------+--------------------+--------------------+---------------+--------+------+---------------+
|   S10_1678|1969 Harley David...|     Motorcycles|       01:10|     Min Lin Diecast|This replica feat...|           7933|      48|  95.7|       Moderate|
|   S10_1949|1952 Alpine Renau...|    Classic Cars|       01:10|Classic Metal Cre...|Turnable front wh...|           7305|      98| 214.3|      Expensive|
|   S10_2016|1996 Moto Guzzi 1...|     Motorcycles|       01:10|Highway 66 Mini C...|Official Moto Guz...|           6625|      68|118.94|       Moderate|
|   S10_4698|2003 Harley-David...|     Motorcycles|       01:10|   Red

## Q8) 2.5  Rotate dataframe

In [20]:

def return_orders_by_day_by_status(spark :pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    '''
    Return No. of orders by weekday(1 to 7) by order status.

    Return a spark dataframe with following columns.
    +-------+---------+--------+----------+-------+--------+-------+
    |weekDay|Cancelled|Disputed|In Process|On Hold|Resolved|Shipped|
    +-------+---------+--------+----------+-------+--------+-------+
    |      1|     null|    null|         2|      1|    null|      8|
           2
           3 and so on
    '''
    orders = load_data_from_csv(spark, "orders.csv")
    return orders.select(dayofweek('orderDate').alias('weekDay'), '*').groupBy('weekDay').pivot('status') \
    .agg(count('orderNumber').alias('totalOrders')).orderBy('weekDay')

In [21]:
return_orders_by_day_by_status(spark).show()

+-------+---------+--------+----------+-------+--------+-------+
|weekDay|Cancelled|Disputed|In Process|On Hold|Resolved|Shipped|
+-------+---------+--------+----------+-------+--------+-------+
|      1|     null|    null|         2|      1|    null|      8|
|      2|     null|       1|         2|   null|    null|     47|
|      3|        2|    null|         2|   null|       2|     50|
|      4|        1|    null|      null|   null|       2|     61|
|      5|        2|    null|      null|   null|    null|     57|
|      6|        1|       2|      null|      3|    null|     63|
|      7|     null|    null|      null|   null|    null|     17|
+-------+---------+--------+----------+-------+--------+-------+



## Q9) 2.6  Clean ProductLines text Description

In [22]:
def clean_productlines_description(spark :pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    '''
    Take testDescription from productLines table and clean it.
    1. Convert to lower-case
    2. Remove all special characters except a-z and 0-9.
    3. Remove stop words (using StopWordsRemover.loadDefaultStopWords("english"))
    The final value(testDescription) must be a string.
    
    Return a spark dataframe with following columns.
    |productLine|htmlDescription|image|textDescription|
    '''
    
    def remove_special_chars(words):
        return [word.lower().replace("[^a-z0-9 ]", "") for word in words]
    
    pLines = load_data_from_csv(spark, "productlines.csv")

    pLines = pLines.withColumn("textDescription", split(col("textDescription"), " "))
    remove_special_chars_udf = udf(remove_special_chars, ArrayType(StringType()))
    pLines = pLines.withColumn("textDescription", remove_special_chars_udf(col("textDescription")))

    stopwords = StopWordsRemover.loadDefaultStopWords("english")
    stopwords_remover = StopWordsRemover(inputCol="textDescription", outputCol="textDescription_wos", stopWords=stopwords)
    pLines = stopwords_remover.transform(pLines).drop("textDescription")

    pLines = pLines.withColumn("textDescription", concat_ws(" ", "textDescription_wos")).drop('textDescription_wos')

    return pLines

In [51]:
# productLines.show()
clean_productlines_description(spark).show()

+----------------+---------------+-----+--------------------+
|     productLine|htmlDescription|image|     textDescription|
+----------------+---------------+-----+--------------------+
|    Classic Cars|           NULL| NULL|attention car ent...|
|     Motorcycles|           NULL| NULL|motorcycles state...|
|          Planes|           NULL| NULL|unique, diecast a...|
|           Ships|           NULL| NULL|perfect holiday a...|
|          Trains|           NULL| NULL|model trains rewa...|
|Trucks and Buses|           NULL| NULL|truck bus models ...|
|    Vintage Cars|           NULL| NULL|vintage car model...|
+----------------+---------------+-----+--------------------+



----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 55437)
Traceback (most recent call last):
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.11_3.11.1264.0_x64__qbz5n2kfra8p0\Lib\socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.11_3.11.1264.0_x64__qbz5n2kfra8p0\Lib\socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.11_3.11.1264.0_x64__qbz5n2kfra8p0\Lib\socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.11_3.11.1264.0_x64__qbz5n2kfra8p0\Lib\socketserver.py", line 755, in __init__
    self.handle()
  File "C:\Users\Vinni\AppData\Local\Packages\Pyt

# 3. Analytics

## Q10) 3.1 Your company wants to promote their top 5 employees who made maximum sales(orders)

In [25]:
def return_top_5_employees(spark :pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    '''
    Return the top 5 employees(Sales Representatives) who made maximum no. of
    sales throughout the given timeperiod. You can consider all sales with all statuses.
    
    Return a spark dataframe with following columns.
    employeeNumber|firstName|lastName |TotalOrders
    '''

    orders = load_data_from_csv(spark, "orders.csv")
    employees = load_data_from_csv(spark, "employees.csv")
    customers = load_data_from_csv(spark, "customers.csv")
    
    top_5_emp = orders.join(customers, 'customerNumber').groupBy('customerNumber').agg(count('orderNumber').alias('totalOrders'), \
                first('salesRepEmployeeNumber').alias('salesRepEmployeeNumber')).groupBy('salesRepEmployeeNumber') \
                .agg(sum('totalOrders').alias('totalOrders')).orderBy(col('totalOrders').desc()).limit(5) \
                .join(employees, col("salesRepEmployeeNumber")==employees.employeeNumber) \
                .select('firstName', 'lastName', 'totalOrders').orderBy(col('totalOrders').desc())
    return top_5_emp

In [26]:
return_top_5_employees(spark)

DataFrame[firstName: string, lastName: string, totalOrders: bigint]

## Q11) 3.2 Imagine you are a sales representative. Mail your manager a report on cancelled orders

In [49]:
def report_cancelled_orders(spark :pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    '''
    Return email adresses of your manager to send a report about the cancelled orders. Return
    Order details and a column to define if the cancelled order has been shipped already
    or not('Yes' or  'No').
     
    Return a spark dataframe with following columns.
    +-----------+---------+--------------------+--------------+----------------------+---------+--------------------+
    |orderNumber|isShipped|            comments|customerNumber|salesRepEmployeeNumber|reportsTo|               email|
    +-----------+---------+--------------------+--------------+----------------------+---------+--------------------+
    |      10167|       No|Customer called t...|           448|                  1504|     1102|gbondur@classicmo...|
           ...
           ... so on
    '''
    
    load_data_from_csv(spark, "customers.csv").createOrReplaceTempView("customers_sql")
    load_data_from_csv(spark, "employees.csv").createOrReplaceTempView("employees_sql")
    load_data_from_csv(spark, "orders.csv").createOrReplaceTempView("orders_sql")


    emails = spark.sql(" select oce.*, email from employees_sql\
              join (select oc.*, reportsTo from employees_sql \
              join (select orders.*, salesRepEmployeeNumber from customers_sql \
              join (select orderNumber, case when shippedDate = 'NULL' then 'No' else 'Yes' end as isShipped, \
              comments, customerNumber from orders_sql where status='Cancelled') as orders \
              on customers_sql.customerNumber == orders.customerNumber) as oc \
              on employees_sql.employeeNumber == oc.salesRepEmployeeNumber) as oce \
              on employees_sql.employeeNumber == oce.reportsTo \
            ")
    return emails


In [50]:
report_cancelled_orders(spark).show()

+-----------+---------+--------------------+--------------+----------------------+---------+--------------------+
|orderNumber|isShipped|            comments|customerNumber|salesRepEmployeeNumber|reportsTo|               email|
+-----------+---------+--------------------+--------------+----------------------+---------+--------------------+
|      10167|       No|Customer called t...|           448|                  1504|     1102|gbondur@classicmo...|
|      10179|      Yes|Customer cancelle...|           496|                  1612|     1088|wpatterson@classi...|
|      10248|       No|Order was mistake...|           131|                  1323|     1143|abow@classicmodel...|
|      10253|      Yes|Customer disputed...|           201|                  1501|     1102|gbondur@classicmo...|
|      10260|       No|Customer heard co...|           357|                  1612|     1088|wpatterson@classi...|
|      10262|       No|This customer fou...|           141|                  1370|     1

## Q12) 3.3 Return Top 5 big spenders

In [29]:
def return_top_5_big_spenders(spark :pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    '''
    Return top 5 big spenders who had spent the most $(highest order value).
    
    Return a spark dataframe with following columns.
    customerNumber|customerName|totalOrderValue|
    '''
    
    customers_df = load_data_from_csv(spark, "customers.csv")
    orders_df = load_data_from_csv(spark, "orders.csv")
    orderdetails_df = load_data_from_mysql(spark, "classicmodels", 'orderdetails')
    merged_data = customers_df.join(orders_df, "customerNumber").join(orderdetails_df, "orderNumber")
    # Calculate the total order value for each customer
    total_order_value = merged_data.groupBy("customerNumber", "customerName") \
    .agg(sum(merged_data.priceEach * merged_data.quantityOrdered).alias("totalOrderValue")) \
    .orderBy(col('totalOrderValue').desc()).limit(5)
    # print(total_order_value.show(truncate=False))
    # Return the total order value DataFrame
    return total_order_value

In [30]:
return_top_5_big_spenders(spark).show(truncate=False)

+--------------+----------------------------+---------------+
|customerNumber|customerName                |totalOrderValue|
+--------------+----------------------------+---------------+
|141           |Euro+ Shopping Channel      |820689.54      |
|124           |Mini Gifts Distributors Ltd.|591827.34      |
|114           |Australian Collectors, Co.  |180585.07      |
|151           |Muscle Machine Inc          |177913.95      |
|119           |La Rochelle Gifts           |158573.12      |
+--------------+----------------------------+---------------+



## Q13) 3.4 Return Top 5 big spenders(countries)

In [31]:
def return_top_5_big_spend_countries(spark :pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    '''
    Return top 5 big countries which had spent the most $(highest order value).
    
    Return a spark dataframe with following columns.
    |country|totalOrderValue|
    '''
    
    customers_df = load_data_from_csv(spark, "customers.csv")
    orders_df = load_data_from_csv(spark, "orders.csv")
    orderdetails_df = load_data_from_mysql(spark, "classicmodels", 'orderdetails')
    merged_data = customers_df.join(orders_df, "customerNumber").join(orderdetails_df, "orderNumber")
    # Calculate the total order value for each customer
    total_order_value = merged_data.groupBy("country") \
    .agg(sum(merged_data.priceEach * merged_data.quantityOrdered).alias("totalOrderValue")) \
    .orderBy(col('totalOrderValue').desc()).limit(5)
    # print(total_order_value.show(truncate=False))
    # Return the total order value DataFrame
    return total_order_value

In [32]:
return_top_5_big_spend_countries(spark).show(truncate=False)

+-----------+---------------+
|country    |totalOrderValue|
+-----------+---------------+
|USA        |3273280.05     |
|Spain      |1099389.09     |
|France     |1007374.02     |
|Australia  |562582.59      |
|New Zealand|476847.01      |
+-----------+---------------+

