In [10]:
import os
import sys

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window

In [None]:
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))

In [12]:
from modules.utils import read_yaml_config, read_config_parameters

In [None]:
# Creating Spark Session
spark = SparkSession.builder \
    .appName('Upstart13Analysis') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/06 01:17:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [15]:
# Reading YAML config file
config = read_yaml_config(file_path='../config.yaml')

In [16]:
# Setting Parquet gold paths
gld_path_products = read_config_parameters(config_file=config, structure=['gold', 'products'])
gld_path_publish_orders = read_config_parameters(config_file=config, structure=['gold', 'publish_orders'])

In [18]:
# Reading Parquet files from Gold Layer
products = spark.read.parquet(f'../{gld_path_products}')
publish_orders = spark.read.parquet(f'../{gld_path_publish_orders}')

                                                                                

## Analysis Questions

#### 1. Which color generated the highest revenue each year?

In [19]:
(
    publish_orders
    .join(
        products.select('ProductID', 'Color'),
        on='ProductID',
        how='inner'
    )
    .groupBy(
        F.year('OrderDate').alias('OrderYear'),
        'Color'
    )
    .agg(
        F.sum('TotalLineExtendedPrice').alias('TotalRevenue')
    )
    .withColumn(
        'Rank',
        F.dense_rank().over(
            Window.partitionBy('OrderYear').orderBy(F.col('TotalRevenue').desc())
        )
    )
    .filter(F.col('Rank') == 1)
    .select('OrderYear', 'Color', 'TotalRevenue')
    .orderBy('OrderYear')
).show()

[Stage 3:>                                                          (0 + 2) / 2]

+---------+------+--------------------+
|OrderYear| Color|        TotalRevenue|
+---------+------+--------------------+
|     2021|   Red|   6019614.015699884|
|     2022| Black|1.4005242975200394E7|
|     2023| Black|1.5047694369201014E7|
|     2024|Yellow|   6480746.072200298|
+---------+------+--------------------+



                                                                                

#### 2. What is the average LeadTimeInBusinessDays by ProductCategoryName?

In [20]:
(
    publish_orders
    .join(
        products.select('ProductID', 'ProductCategoryName'),
        on='ProductID',
        how='inner'
    )
    .groupBy('ProductCategoryName')
    .agg(
        F.round(F.avg('LeadTimeInBusinessDays'), 2).alias('AvgLeadTimeInBusinessDays')
    )
    .orderBy('ProductCategoryName')
).show()

[Stage 10:>                                                         (0 + 2) / 2]

+-------------------+-------------------------+
|ProductCategoryName|AvgLeadTimeInBusinessDays|
+-------------------+-------------------------+
|               null|                     4.72|
|        Accessories|                      4.7|
|              Bikes|                     4.67|
|           Clothing|                     4.71|
|         Components|                     4.67|
+-------------------+-------------------------+



                                                                                

In [21]:
spark.stop()