In [1]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext()

In [None]:
SHUFFLE_PARTITIONS = 2
INPUT_LOC = 's3/somebucketname/tpcds-dwh'

In [73]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
from pyspark.sql import Window
from pyspark.sql.types import IntegerType, DecimalType, StringType, DateType, LongType, FloatType

# initialise sparkContext
spark = SparkSession.builder \
    .master('local') \
    .appName('BI_queries') \
    .config('spark.executor.memory', '5gb') \
    .config("spark.cores.max", "6") \
    .config("spark.sql.shuffle.partitions", SHUFFLE_PARTITIONS) \
    .config("spark.sql.catalogImplementation","hive") \
    .config("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
    .enableHiveSupport() \
    .getOrCreate()

In [85]:
d_customer = spark.read.parquet(f'{INPUT_LOC}/d_customer/')
d_customer.createOrReplaceTempView('d_customer')

d_date_dim = spark.read.parquet(f'{INPUT_LOC}/d_date_dim/')
d_date_dim.createOrReplaceTempView('d_date_dim')

d_item = spark.read.parquet(f'{INPUT_LOC}/d_item/')
d_item.createOrReplaceTempView('d_item')

d_store = spark.read.parquet(f'{INPUT_LOC}/d_store/')
d_store.createOrReplaceTempView('d_store')

d_item_category = spark.read.parquet(f'{INPUT_LOC}/d_item_category/')
d_item_category.createOrReplaceTempView('d_item_category')

d_store_city = spark.read.parquet(f'{INPUT_LOC}/d_store_city/')
d_store_city.createOrReplaceTempView('d_store_city')

d_store_country = spark.read.parquet(f'{INPUT_LOC}/d_store_country/')
d_store_country.createOrReplaceTempView('d_store_country')

f_store_sales = spark.read.parquet(f'{INPUT_LOC}/f_store_sales/')
f_store_sales.createOrReplaceTempView('f_store_sales')

# aggregations
a_store_sales_store_lvl = spark.read.parquet(f'{INPUT_LOC}/a_store_sales_store_lvl/')
a_store_sales_store_lvl.createOrReplaceTempView('a_store_sales_store_lvl')

a_store_sales_customer_lvl = spark.read.parquet(f'{INPUT_LOC}/a_store_sales_customer_lvl/')
a_store_sales_customer_lvl.createOrReplaceTempView('a_store_sales_customer_lvl')

a_store_sales_item_lvl = spark.read.parquet(f'{INPUT_LOC}/a_store_sales_item_lvl/')
a_store_sales_item_lvl.createOrReplaceTempView('a_store_sales_item_lvl')

a_store_sales_store_quantity_ma = spark.read.parquet(f'{INPUT_LOC}/a_store_sales_store_quantity_ma/')
a_store_sales_store_quantity_ma.createOrReplaceTempView('a_store_sales_store_quantity_ma')

In [25]:
# 1a. Top 10 products (excl. products with the name UNKNOWN) in terms of total sales (sql code)
spark.sql('''
   select ss_item_sk, sum(sum_ss_quantity) as sum_sold, i_product_name
   from a_store_sales_item_lvl AG
   inner join (select i_item_sk, i_product_name from d_item where i_product_name <> 'UNKNOWN') DIM on AG.ss_item_sk = DIM.i_item_sk
   group by ss_item_sk, i_product_name
   order by sum_sold desc
   limit 10
    
''').show()

+----------+--------+--------------------+
|ss_item_sk|sum_sold|      i_product_name|
+----------+--------+--------------------+
|     14119|    1017|n stoughtoughtese...|
|      2881|     897|oughteingeingable...|
|     14701|     862|oughtbarationeseo...|
|      9799|     830|n stn stationn st...|
|      2233|     819|pripriableable   ...|
|     14059|     816|n stantibareseoug...|
|     11671|     803|oughtationcallyou...|
|     10645|     787|antiesecallybarou...|
|      3140|     783|bareseoughtpri   ...|
|     16867|     778|ationcallyeingcal...|
+----------+--------+--------------------+



In [29]:
# 1b. Top 10 products (excl. products with the name UNKNOWN) in terms of total sales (spark code)
a_store_sales_item_lvl \
    .join(d_item.filter(sf.col('i_product_name') != 'UNKNOWN').select(sf.col('i_item_sk').alias('ss_item_sk'), 'i_product_name'), 'ss_item_sk', 'inner') \
    .groupBy('ss_item_sk', 'i_product_name') \
    .agg(sf.sum('sum_ss_quantity').alias('sum_sold')) \
    .orderBy(sf.desc('sum_sold')) \
    .limit(10) \
    .show()

+----------+--------------------+--------+
|ss_item_sk|      i_product_name|sum_sold|
+----------+--------------------+--------+
|     14119|n stoughtoughtese...|    1017|
|      2881|oughteingeingable...|     897|
|     14701|oughtbarationeseo...|     862|
|      9799|n stn stationn st...|     830|
|      2233|pripriableable   ...|     819|
|     14059|n stantibareseoug...|     816|
|     11671|oughtationcallyou...|     803|
|     10645|antiesecallybarou...|     787|
|      3140|bareseoughtpri   ...|     783|
|     16867|ationcallyeingcal...|     778|
+----------+--------------------+--------+



In [31]:
# 2a. Top 3 stores (excl. stores with the name UNKNOWN) in terms of total sales (sql code)
spark.sql('''
   select ss_store_sk, sum(sum_ss_quantity) as sum_sold, s_store_name
   from a_store_sales_store_lvl AG
   inner join (select s_store_sk, s_store_name from d_store where s_store_name <> 'UNKNOWN') DIM on AG.ss_store_sk = DIM.s_store_sk
   group by ss_store_sk, s_store_name
   order by sum_sold desc
   limit 3
    
''').show()

+-----------+--------+------------+
|ss_store_sk|sum_sold|s_store_name|
+-----------+--------+------------+
|          2|  484939|        able|
|          1|  473032|       ought|
|         10|  458419|         bar|
+-----------+--------+------------+



In [33]:
# 2b. Top 3 stores (excl. stores with the name UNKNOWN) in terms of total sales (spark code)
a_store_sales_store_lvl \
    .join(d_store.filter(sf.col('s_store_name') != 'UNKNOWN').select(sf.col('s_store_sk').alias('ss_store_sk'), 's_store_name'), 'ss_store_sk', 'inner') \
    .groupBy('ss_store_sk', 's_store_name') \
    .agg(sf.sum('sum_ss_quantity').alias('sum_sold')) \
    .orderBy(sf.desc('sum_sold')) \
    .limit(3) \
    .show()

+-----------+------------+--------+
|ss_store_sk|s_store_name|sum_sold|
+-----------+------------+--------+
|          2|        able|  484939|
|          1|       ought|  473032|
|         10|         bar|  458419|
+-----------+------------+--------+



In [37]:
# 3a. Top 20 customers (excl. customers with first and last names UNKNOWN) in terms of total sales (sql code)
spark.sql('''
   select ss_customer_sk, sum(sum_ss_quantity) as sum_sold, c_first_name, c_last_name
   from a_store_sales_customer_lvl AG
   inner join (select c_customer_sk, c_first_name, c_last_name from d_customer where c_first_name <> 'UNKNOWN' and c_last_name <> 'UNKNOWN') DIM on AG.ss_customer_sk = DIM.c_customer_sk
   group by ss_customer_sk, c_first_name, c_last_name
   order by sum_sold desc
   limit 20
    
''').show()

+--------------+--------+--------------------+--------------------+
|ss_customer_sk|sum_sold|        c_first_name|         c_last_name|
+--------------+--------+--------------------+--------------------+
|         19469|    1841|Sally               |Andrews          ...|
|          1691|    1823|Stephan             |Chavez           ...|
|         87020|    1612|George              |Joyner           ...|
|         17327|    1514|Kathy               |Pierce           ...|
|         90438|    1474|Ann                 |Weaver           ...|
|         94719|    1447|James               |Cruz             ...|
|         58240|    1445|Mark                |Burrow           ...|
|         67919|    1411|Kristine            |Oneill           ...|
|         60349|    1401|Richard             |Gray             ...|
|         24926|    1388|Joyce               |Adkins           ...|
|         90279|    1386|Lori                |Wertz            ...|
|          4801|    1379|Ashley              |He

In [42]:
# 3b. Top 20 customers (excl. customers with first and last names UNKNOWN) in terms of total sales (spark code)
a_store_sales_customer_lvl \
    .join(d_customer.filter((sf.col('c_first_name') != 'UNKNOWN') & (sf.col('c_last_name') != 'UNKNOWN')).select(sf.col('c_customer_sk').alias('ss_customer_sk'), 'c_first_name', 'c_last_name'), 'ss_customer_sk', 'inner') \
    .groupBy('ss_customer_sk', 'c_first_name', 'c_last_name') \
    .agg(sf.sum('sum_ss_quantity').alias('sum_sold')) \
    .orderBy(sf.desc('sum_sold')) \
    .limit(20) \
    .show()

+--------------+--------------------+--------------------+--------+
|ss_customer_sk|        c_first_name|         c_last_name|sum_sold|
+--------------+--------------------+--------------------+--------+
|         19469|Sally               |Andrews          ...|    1841|
|          1691|Stephan             |Chavez           ...|    1823|
|         87020|George              |Joyner           ...|    1612|
|         17327|Kathy               |Pierce           ...|    1514|
|         90438|Ann                 |Weaver           ...|    1474|
|         94719|James               |Cruz             ...|    1447|
|         58240|Mark                |Burrow           ...|    1445|
|         67919|Kristine            |Oneill           ...|    1411|
|         60349|Richard             |Gray             ...|    1401|
|         24926|Joyce               |Adkins           ...|    1388|
|         90279|Lori                |Wertz            ...|    1386|
|          4801|Ashley              |Healy      

In [46]:
# 4a. Total sales for the set of products with the name UNKNOWN (sql code)
spark.sql('''
   select i_product_name, sum(sum_ss_quantity) as sum_sold
   from a_store_sales_item_lvl AG
   inner join (select i_item_sk, i_product_name from d_item where i_product_name = 'UNKNOWN') DIM on AG.ss_item_sk = DIM.i_item_sk
   group by i_product_name
   order by sum_sold desc
   limit 1
    
''').show()

+--------------+--------+
|i_product_name|sum_sold|
+--------------+--------+
|       UNKNOWN|  341731|
+--------------+--------+



In [59]:
# 4b. Total sales for the set of products with the name UNKNOWN (spark code)
d_item \
    .select(sf.col('i_item_sk').alias('ss_item_sk'), 'i_product_name') \
    .filter(sf.col('i_product_name') == 'UNKNOWN') \
    .join(a_store_sales_item_lvl.select('ss_item_sk', 'sum_ss_quantity'), 'ss_item_sk', 'left') \
    .groupBy('i_product_name') \
    .agg(sf.sum('sum_ss_quantity').alias('sum_sold')) \
    .orderBy(sf.desc('sum_sold')) \
    .limit(1) \
    .show()

+--------------+--------+
|i_product_name|sum_sold|
+--------------+--------+
|       UNKNOWN|  341731|
+--------------+--------+



In [50]:
# 5a. Total sales for the set of stores with the name UNKNOWN (sql code)
spark.sql('''
   select s_store_name, sum(sum_ss_quantity) as sum_sold
   from a_store_sales_store_lvl AG
   inner join (select s_store_sk, s_store_name from d_store where s_store_name = 'UNKNOWN') DIM on AG.ss_store_sk = DIM.s_store_sk
   group by s_store_name
   order by sum_sold desc
   limit 1
    
''').show()

+------------+--------+
|s_store_name|sum_sold|
+------------+--------+
|     UNKNOWN|   65751|
+------------+--------+



In [60]:
# 5b. Total sales for the set of stores with the name UNKNOWN (spark code)
d_store \
    .select(sf.col('s_store_sk').alias('ss_store_sk'), 's_store_name') \
    .filter(sf.col('s_store_name') == 'UNKNOWN') \
    .join(a_store_sales_store_lvl.select('ss_store_sk', 'sum_ss_quantity'), 'ss_store_sk', 'left') \
    .groupBy('s_store_name') \
    .agg(sf.sum('sum_ss_quantity').alias('sum_sold')) \
    .orderBy(sf.desc('sum_sold')) \
    .limit(1) \
    .show()

+------------+--------+
|s_store_name|sum_sold|
+------------+--------+
|     UNKNOWN|   65751|
+------------+--------+



In [61]:
# 6a. Total sales for the set of customers with the first and last name UNKNOWN (sql code)
spark.sql('''
   select c_first_name, c_last_name, sum(sum_ss_quantity) as sum_sold
   from a_store_sales_customer_lvl AG
   inner join (select c_customer_sk, c_first_name, c_last_name from d_customer where c_first_name = 'UNKNOWN' and c_last_name = 'UNKNOWN') DIM on AG.ss_customer_sk = DIM.c_customer_sk
   group by c_first_name, c_last_name
   order by sum_sold desc
   limit 1
    
''').show()

+------------+-----------+--------+
|c_first_name|c_last_name|sum_sold|
+------------+-----------+--------+
|     UNKNOWN|    UNKNOWN|  442607|
+------------+-----------+--------+



In [62]:
# 6b. Total sales for the set of customers with the first and last name UNKNOWN (spark code)
d_customer \
    .select(sf.col('c_customer_sk').alias('ss_customer_sk'), 'c_first_name', 'c_last_name') \
    .filter((sf.col('c_first_name') == 'UNKNOWN') & (sf.col('c_last_name') == 'UNKNOWN')) \
    .join(a_store_sales_customer_lvl.select('ss_customer_sk', 'sum_ss_quantity'), 'ss_customer_sk', 'left') \
    .groupBy('c_first_name', 'c_last_name') \
    .agg(sf.sum('sum_ss_quantity').alias('sum_sold')) \
    .orderBy(sf.desc('sum_sold')) \
    .limit(1) \
    .show()

+------------+-----------+--------+
|c_first_name|c_last_name|sum_sold|
+------------+-----------+--------+
|     UNKNOWN|    UNKNOWN|  442607|
+------------+-----------+--------+



In [63]:
# 6b. Total sales for the set of customers with the first and last name UNKNOWN (spark code)
d_customer \
    .select(sf.col('c_customer_sk').alias('ss_customer_sk'), 'c_first_name', 'c_last_name') \
    .filter((sf.col('c_first_name') == 'UNKNOWN') & (sf.col('c_last_name') == 'UNKNOWN')) \
    .join(a_store_sales_customer_lvl.select('ss_customer_sk', 'sum_ss_quantity'), 'ss_customer_sk', 'left') \
    .groupBy('c_first_name', 'c_last_name') \
    .agg(sf.sum('sum_ss_quantity').alias('sum_sold')) \
    .orderBy(sf.desc('sum_sold')) \
    .limit(1) \
    .show()

+------------+-----------+--------+
|c_first_name|c_last_name|sum_sold|
+------------+-----------+--------+
|     UNKNOWN|    UNKNOWN|  442607|
+------------+-----------+--------+

