In [0]:
import pyspark
import pyspark.sql.functions
from pyspark.sql.functions import count, sum as _sum, asc, desc, col, countDistinct
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Pyspark').getOrCreate()

In [0]:
filepath = 'dbfs:/FileStore/tables/'

customer_dim = spark.read.csv(filepath + "customer_dim.csv", header = True, inferSchema = True)
item_dim = spark.read.csv(filepath + "item_dim.csv", header = True, inferSchema = True)
store_dim = spark.read.csv(filepath + "store_dim.csv", header = True, inferSchema = True)
time_dim = spark.read.csv(filepath + "time_dim.csv", header = True, inferSchema = True)
trans_dim = spark.read.csv(filepath + "Trans_dim.csv", header = True, inferSchema = True)
fact_sales = spark.read.csv(filepath + "fact_table-1.csv", header = True, inferSchema = True)


## Q4.1. Top 10 highest selling items

In [0]:
result = (fact_sales.alias("f")
    .join(item_dim.alias("i"), col("f.item_key") == col("i.item_key"))
    .groupBy("item_name")
    .agg(_sum(col("total_price").cast("integer")).alias("total_sales_amount"))
    .orderBy(desc("total_sales_amount"))
    .limit(10)
)

result.show()


+--------------------+------------------+
|           item_name|total_sales_amount|
+--------------------+------------------+
|       Red Bull 12oz|           1305700|
|K Cups Daily Chef...|           1245394|
|K Cups Original D...|           1188843|
|K Cups Dunkin Don...|           1109760|
|Muscle Milk Prote...|           1050924|
|K Cups Folgers Li...|           1042406|
|     Honey Packets  |           1012995|
|K Cups � Starbuck...|            995456|
|K Cups �Organic B...|            957516|
|K Cups - McCafe P...|            956886|
+--------------------+------------------+



## Q4.2. Top 10 items with the highest number of quantities sold


In [0]:
result = (
    fact_sales
    .join(item_dim, fact_sales["item_key"] == item_dim["item_key"], "inner")
    .groupBy("item_name")
    .agg(_sum(col("quantity").cast("int")).alias("total_quantity_sold"))
    .orderBy(desc("total_quantity_sold"))
    .limit(10)
)

result.show()

+--------------------+-------------------+
|           item_name|total_quantity_sold|
+--------------------+-------------------+
|  Pepsi - 12 oz cans|              46837|
|Muscle Milk Prote...|              45665|
|Coke Classic 12 o...|              45501|
|Diet Coke - 12 oz...|              45202|
| Sprite - 12 oz cans|              45140|
|Diet Pepsi - 12 o...|              23969|
|Nat.Valley Peanut...|              23958|
|Nabisco Classic M...|              23852|
|       Red Bull 12oz|              23740|
|Cascade Gel Packs...|              23648|
+--------------------+-------------------+



## Q4.3. Top 5 manufacturing country by quantity sold and ordered by total sales

In [0]:
result = (
    fact_sales
    .join(item_dim, fact_sales["item_key"] == item_dim["item_key"], "inner")
    .groupBy("man_country")
    .agg(
        _sum(col("quantity").cast("int")).alias("total_quantity_sold"),
        _sum(col("total_price").cast("int")).alias("total_sales_amount")
    )
    .orderBy(desc("total_sales_amount"))
    .limit(5)
)

result.show()


+-----------+-------------------+------------------+
|man_country|total_quantity_sold|total_sales_amount|
+-----------+-------------------+------------------+
| Bangladesh|             772031|          13332668|
|      India|             730625|          13151006|
|  Lithuania|             592105|          11739725|
|     poland|             635611|          10966500|
|    Germany|             699848|          10948917|
+-----------+-------------------+------------------+



## Q4.4.	Year, quarter and month with highest sales

In [0]:
result = (
    fact_sales
    .join(time_dim, fact_sales["time_key"] == time_dim["time_key"], "inner")
    .groupBy("year", "quarter", "month")
    .agg(
        _sum(col("total_price").cast("float")).alias("total_sales_amount")
    )
    .orderBy(
        asc("total_sales_amount"),
        asc("year"),
        asc("quarter"),
        asc("month")
    )
)
result.show()


+----+-------+-----+------------------+
|year|quarter|month|total_sales_amount|
+----+-------+-----+------------------+
|2014|     Q1|    1|         496549.25|
|2021|     Q1|    1|         883772.25|
|2014|     Q1|    2|         1122547.0|
|2020|     Q1|    2|         1128964.0|
|2017|     Q1|    2|         1144129.5|
|2018|     Q1|    2|         1148761.5|
|2019|     Q1|    2|         1149508.5|
|2015|     Q1|    2|        1157159.75|
|2019|     Q2|    6|         1158766.0|
|2016|     Q2|    6|        1193281.75|
|2015|     Q2|    4|         1193564.0|
|2018|     Q2|    4|         1197510.0|
|2020|     Q4|   11|        1200248.75|
|2019|     Q2|    4|         1201298.0|
|2016|     Q1|    1|         1202031.5|
|2019|     Q4|   12|         1206605.5|
|2016|     Q3|    8|         1208266.0|
|2020|     Q2|    4|         1211492.0|
|2014|     Q4|   11|        1211569.75|
|2016|     Q4|   10|         1211723.5|
+----+-------+-----+------------------+
only showing top 20 rows



## Q4.5. Number of different transaction types by customer 

In [0]:
result = (
    fact_sales
    .join(trans_dim, fact_sales["payment_key"] == trans_dim["payment_key"], "inner")
    .join(customer_dim, fact_sales["coustomer_key"] == customer_dim["coustomer_key"], "inner")
    .groupBy(fact_sales["coustomer_key"], customer_dim["name"])
    .agg(
        countDistinct(trans_dim["trans_type"]).alias("count_used_trans_type")
    )
    .orderBy(
        asc(fact_sales["coustomer_key"])
    )
    .filter(col("count_used_trans_type") > 1)
)

result.show()


+-------------+----------------+---------------------+
|coustomer_key|            name|count_used_trans_type|
+-------------+----------------+---------------------+
|      C000001|           sumit|                    3|
|      C000002|        tammanne|                    3|
|      C000003|   kailash kumar|                    3|
|      C000004| bhagwati prasad|                    3|
|      C000005|            ajay|                    3|
|      C000006|        silender|                    3|
|      C000007|          deepak|                    3|
|      C000008|        akhilesh|                    3|
|      C000009|  dipendra kumar|                    3|
|      C000010|           nitin|                    3|
|      C000011|doodhnath pandit|                    3|
|      C000012|     aslam allam|                    3|
|      C000013|           rahul|                    2|
|      C000014|  jitender kumar|                    3|
|      C000015|           adnan|                    3|
|      C00