In [1]:
import pyspark

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, IntegerType
from pyspark.sql.types import StructType, StructField, DoubleType

import sys
from pathlib import Path
sys.path.append(str(Path.cwd().parent))

In [None]:
# initiate spark session
spark = pyspark.sql.SparkSession.builder.appName("appName").config("spark.driver.extraClassPath", "C:\\mysql-connector-j-8.0.33.jar").getOrCreate()

# 1. Load data

## Q1) 1.1 Load data from MySQL

In [4]:
def load_data_from_mysql(spark :pyspark.sql.SparkSession, db_name :str, table_name :str) \
                        -> pyspark.sql.DataFrame:
    '''
        – load data from MySQL table 'table_name' from database 'db_name' and return the 
        table as a spark dataframe.

        For the mysql connection use below information:
            jdbc driver: 'com.mysql.cj.jdbc.Driver'
            Hostname: 'localhost'
            Port: 3306
            Database: 'classicmodels'
            Table_name: 'order_details'
            Username: 'root'
            Password: 'pass@word1'
    '''
    # your code goes here
    pass


## Q2) 1.2 Load data from csv

In [5]:
def load_data_from_csv(spark, csv_file_name):
        '''
        Load data from CSV file 'csv_file_name' and return a spark Dataframe
        '''
        # your code goes here
        pass

## Q3) 1.3  Load data from flat file

In [None]:
def load_data_from_flatfile(spark :pyspark.sql.SparkSession, txt_file_name: str) \
                      -> pyspark.sql.DataFrame:
    '''
    Load data from flat file 'txt_file_name' separated with ':' and return a spark Dataframe
    PS: The data files for this assignment are in 'data' folder
    You can access full path of 'data/' folder using 'DATA_FOLDER' variable
    from constants.py
    '''
    # your code goes here
    pass

# 2. Transformations

## Q4) 2.1  Clean product's MSRP data'

In [None]:
def clean_product_MSRP_column(spark :pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    '''
    Due to a data entry issues MSRP, the selling price is lower than its buyPrice for some
    products. Change MSRP to 1.4 times of the buyPrice for such products and cast it to two
    decimal places.
    
    Return a spark dataframe with following columns.
    |productCode|productName|productLine|productScale|productVendor|productDescription|quantityInStock|buyPrice|MSRP|
    '''
    # your code goes here
    pass

## Q5) 2.2  Apply explode on data

In [None]:
def explode_productLines_description(spark :pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    ''' 
    Split the productLines description into words and explode the it.

    Return a spark dataframe with following columns.
    |productLine|description|
    +-----------+------------+
    |Motorcycles|         Our|
    |Motorcycles| motorcycles|
    |Motorcycles|         are|
       ... so on

    '''
    # your code goes here
    pass

## Q6) 2.3  Struct

In [None]:
def get_customer_info(spark :pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    '''
    Return a consolidated customer info using structs.
    
    Return a spark dataframe with following columns.
    |custID|custName|country|#orders|totalMoneySpent|creditLimit|
    '''
    customer_schema = StructType([ \
      StructField("custID",StringType(),True), \
      StructField("custName",StringType(),True), \
      StructField("country",StringType(),True), \
      StructField("#orders",IntegerType(),True), \
      StructField("totalMoneySpent", DoubleType(), True), \
      StructField("creditLimit", IntegerType(), True), \
    ])
    # your code goes here
    pass



## Q7) 2.4  UDF

In [None]:
@udf
def categorize_products(buy_price):
    if buy_price < 40:
        return "Affordable"
    elif buy_price < 80:
        return "Moderate"
    else:
        return "Expensive"
    
def retrun_product_category(spark :pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    ''' 
    Add a new column 'productCategory' using above udf and return the updated dataframe
    Note: Please do not change original dataframe.

    Return a spark dataframe with following columns.
    |productCode|productName|productLine|productScale|productVendor|productDescription \
    |quantityInStock|buyPrice|MSRP|productCategory|
    '''
    # your code goes here
    pass

## Q8) 2.5  Rotate dataframe

In [None]:
def return_orders_by_day_by_status(spark :pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    '''
    Return No. of orders by weekday(1 to 7) by order status.

    Return a spark dataframe with following columns.
    +-------+---------+--------+----------+-------+--------+-------+
    |weekDay|Cancelled|Disputed|In Process|On Hold|Resolved|Shipped|
    +-------+---------+--------+----------+-------+--------+-------+
    |      1|     null|    null|         2|      1|    null|      8|
           2
           3 and so on
    '''
    # your code goes here
    pass

## Q9) 2.6  Clean ProductLines text Description

In [None]:
def clean_productlines_description(spark :pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    '''
    Take testDescription from productLines table and clean it.
    1. Convert to lower-case
    2. Remove all special characters except a-z and 0-9.
    3. Remove stop words (using StopWordsRemover.loadDefaultStopWords("english"))
    The final value(testDescription) must be a string.
    
    Return a spark dataframe with following columns.
    |productLine|htmlDescription|image|textDescription|
    '''
    # your code goes here
    pass

# 3. Analytics

## Q10) 3.1 Your company wants to promote their top 5 employees who made maximum sales(orders)



In [9]:
def return_top_5_employees(spark :pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    '''
    Return the top 5 employees(Sales Representatives) who made maximum no. of
    sales throughout the given timeperiod. You can consider all sales with all statuses.
    
    Return a spark dataframe with following columns.
    employeeNumber|firstName|lastName |TotalOrders
    '''
    # your code goes here
    pass

## Q11) 3.2 Imagine you are a sales representative. Mail your manager a report on cancelled orders

In [11]:
def report_cancelled_orders(spark :pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    '''
    Return email adresses of your manager to send a report about the cancelled orders. Return
    Order details and a column to define if the cancelled order has been shipped already
    or not('Yes' or  'No').
     
    Return a spark dataframe with following columns.
    +-----------+---------+--------------------+--------------+----------------------+---------+--------------------+
    |orderNumber|isShipped|            comments|customerNumber|salesRepEmployeeNumber|reportsTo|               email|
    +-----------+---------+--------------------+--------------+----------------------+---------+--------------------+
    |      10167|       No|Customer called t...|           448|                  1504|     1102|gbondur@classicmo...|
           ...
           ... so on
    '''
    # your code goes here
    pass


## Q12) 3.3 Return Top 5 big spenders

In [140]:
def return_top_5_big_spenders(spark :pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    '''
    Return top 5 big spenders who had spent the most $(highest order value).
    
    Return a spark dataframe with following columns.
    customerNumber|customerName|totalOrderValue|
    '''
    # your code goes here
    pass

## Q13) 3.4 Return Top 5 big spenders(countries)

In [None]:
def return_top_5_big_spend_countries(spark :pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    '''
    Return top 5 big countries which had spent the most $(highest order value).
    
    Return a spark dataframe with following columns.
    |country|totalOrderValue|
    '''
    # your code goes here
    pass