In [0]:
import os
import pandas as pd
import numpy as np

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
from pyspark.sql.window import Window

import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

In [0]:
spark.conf.set("spark.sql.adaptive.enabled", "false")
customers_df = spark.read.parquet("/FileStore/retail/customers.parquet")

departments_df = spark.read.parquet("/FileStore/retail/departments.parquet")
categories_df = spark.read.parquet("/FileStore/retail/categories.parquet")
products_df = spark.read.parquet("/FileStore/retail/products.parquet")
orders_df = spark.read.parquet("/FileStore/retail/orders.parquet")
order_items_df = spark.read.parquet("/FileStore/retail/order_items.parquet")
customers_df.show()

In [0]:
customers_df.createOrReplaceTempView("customers")
departments_df.createOrReplaceTempView("departments")
categories_df.createOrReplaceTempView("categories")
products_df.createOrReplaceTempView("products")
orders_df.createOrReplaceTempView("orders")
order_items_df.createOrReplaceTempView("order_items")


In [0]:
avg_rev_per_day = (orders_df.join(order_items_df, orders_df.order_id == order_items_df.order_item_order_id)
     .select(['order_date', 'order_item_subtotal', 'order_item_order_id'])
     .groupBy('order_date')
     .agg((F.sum('order_item_subtotal') / F.countDistinct('order_item_order_id')).alias('avg_rev_per_day'))
     .orderBy('order_date'))

df = avg_rev_per_day
df1 = avg_rev_per_day
df2 = avg_rev_per_day

#avg_rev_per_day.cache()
#avg_rev_per_day.persist()
import pyspark

avg_rev_per_day.persist(pyspark.StorageLevel.MEMORY_ONLY)
#avg_rev_per_day.persist(pyspark.StorageLevel.DISK_ONLY)
print(avg_rev_per_day.storageLevel)



avg_rev_per_day.show()
avg_rev_per_day.write.mode("overwrite").parquet("./FileStore/retail/avg_rev_per_month")

In [0]:
avg_rev_per_day.unpersist()

In [0]:
avg_rev_per_month = (df
                     .select(F.month('order_date').alias('month'), 'avg_rev_per_day')
                     .groupBy('month')
                     .agg(F.avg('avg_rev_per_day').alias('avg_rev_per_month'))
                     .orderBy('month'))
avg_rev_per_month.count()
#avg_rev_per_month.explain(True)


In [0]:
avg_rev_per_month.toPandas().plot.bar(x='month', title='Average Revenue Per Month');


In [0]:
import re

def strip_margin(text):
    nomargin = re.sub('\n[ \t]*\|', ' ', text)
    trimmed = re.sub('\s+', ' ', nomargin)
    return trimmed

In [0]:
#Get the revenue for each Category Per Year Per Quarter

rev_cat_qt_df = (spark.sql(strip_margin(
            """SELECT c.category_name, YEAR(o.order_date) as order_year, CONCAT('Q', QUARTER(o.order_date)) as order_quarter, order_item_subtotal
              |FROM orders o 
              |INNER JOIN order_items oi on order_item_order_id = o.order_id
              |INNER JOIN products p on oi.order_item_product_id = p.product_id 
              |INNER JOIN categories c on p.product_category_id = c.category_id
              |WHERE o.order_status <> 'CANCELED' AND o.order_status <> 'SUSPECTED_FRAUD'""")))
rev_cat_qt_df.cache()
rev_cat_qt_df.count()


In [0]:
rev_cat_qt_df.unpersist()

In [0]:
rev_cat_qt_pivot_df = (rev_cat_qt_df
              .groupBy('category_name', 'order_year')
              .pivot('order_quarter', ['Q1', 'Q2', 'Q3', 'Q4']) # specifying the unique values (if we know) for pivot column makes execution faster
              .agg(F.round(F.sum('order_item_subtotal'), 2))
              .withColumn('total_sales', F.round(F.coalesce(col('Q1'), F.lit(0)) + F.coalesce(col('Q2'), F.lit(0)) + F.coalesce(col('Q3'), F.lit(0)) + F.coalesce(col('Q4'), F.lit(0)), 2))
              .orderBy('total_sales', ascending=False))

rev_cat_qt_pivot_df.show()