In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
spark = SparkSession.builder.master('local').appName('aggDataFrames').enableHiveSupport().getOrCreate()
sc=spark.sparkContext

In [None]:
orders = spark.read.csv('data//retail_db//orders.csv',header=True,inferSchema=True)
order_items = spark.read.csv('data//retail_db//order_items.csv',header=True,inferSchema=True)
customers = spark.read.csv('data//retail_db//customers.csv',header=True,inferSchema=True)
products = spark.read.csv('data//retail_db//products.csv',header=True,inferSchema=True)
categories = spark.read.csv('data//retail_db//categories.csv',header=True,inferSchema=True)
departments = spark.read.csv('data//retail_db//departments.csv',header=True,inferSchema=True)

# 1. Top N orders for the day based on order revenue

In [None]:
p11=orders.join(order_items, orders.order_id == order_items.order_item_order_id).groupBy(orders.order_date,orders.order_id). \
    agg(round(sum(order_items.order_item_subtotal),2).alias('order_revenue'))

In [None]:
p12=p11.withColumn('rnk',rank().over(Window.partitionBy(orders.order_date).orderBy(col('order_revenue').desc())))

In [None]:
p1=p12.filter(col('rnk') <= 5).drop('rnk').orderBy('order_date',col('order_revenue').desc())

In [None]:
p1.show()

### SQL

In [None]:
spark.sql(" SELECT order_date, order_id, order_revenue FROM \
(SELECT o.order_date, o.order_id, round(sum(oi.order_item_subtotal),2) order_revenue, \
RANK() OVER (PARTITION BY o.order_date ORDER BY round(sum(oi.order_item_subtotal),2) DESC) rnk  \
FROM orders o, order_items oi WHERE o.order_id = oi.order_item_order_id GROUP BY  o.order_date, o.order_id ) main WHERE rnk<= 5 ORDER BY 1, 3 DESC").show()


# 2. Get average revenue per day and all the orders which are more than average.

In [None]:
p21=orders.join(order_items, orders.order_id == order_items.order_item_order_id).groupBy('order_date','order_id').\
agg(round(sum('order_item_subtotal'),2).alias('order_revenue'))

In [None]:
p22=p21.withColumn('avg_revenue',avg('order_revenue').over(Window.partitionBy('order_date')))

In [None]:
p2=p22.filter('order_revenue >= avg_revenue').orderBy('order_date',col('order_revenue').desc())

In [None]:
p2.show()

### SQL

In [None]:
spark.sql(" \
SELECT order_date, order_id, order_revenue, avg_revenue FROM \
(SELECT o.order_date, o.order_id, sum(oi.order_item_subtotal) order_revenue, \
AVG(sum(oi.order_item_subtotal)) OVER (PARTITION BY o.order_date ) avg_revenue  \
FROM orders o, order_items oi WHERE o.order_id = oi.order_item_order_id GROUP BY  o.order_date, o.order_id) \
WHERE order_revenue >= avg_revenue ORDER BY 1, 3 DESC").show()

# 3. Top N products for the day

In [None]:
p31=orders.join(order_items, orders.order_id == order_items.order_item_order_id).join(products, order_items.order_item_product_id == products.product_id).groupBy('order_date','product_name')

In [None]:
p32=p31.agg(round(sum('order_item_subtotal'),2).alias('product_revenue')).withColumn('rnk',rank().over(Window.partitionBy('order_date').orderBy(col('product_revenue').desc())))

In [None]:
p33=p32.filter(col('rnk') <=5).orderBy('order_date',col('product_revenue').desc())

In [None]:
p3=p33

In [None]:
p3.show()

### SQL

In [None]:
spark.sql('SELECT * FROM \
( SELECT o.order_date, p.product_name, round(sum(oi.order_item_subtotal),2) product_revenue, \
RANK() OVER(PARTITION BY o.order_date ORDER BY round(sum(oi.order_item_subtotal),2) DESC) rnk \
from orders o, order_items oi, products p WHERE o.order_id = oi.order_item_order_id AND oi.order_item_product_id = p.product_id \
GROUP BY o.order_date, p.product_name ) WHERE rnk <=5 ORDER BY 1,3 DESC').show()


# 4. Percentage of order items in Order revenue


In [None]:
p41=order_items.withColumn('order_revenue',sum('order_item_subtotal').over(Window.partitionBy('order_item_order_id'))).\
withColumn('pcnt', round(col('order_item_subtotal')/col('order_revenue'),2))

In [None]:
p42=p41.select('order_item_order_id','order_item_id','order_item_subtotal','order_revenue','pcnt').orderBy(col('order_item_order_id'))

In [None]:
p4=p42

In [None]:
p4.show()

### SQL

In [None]:
spark.sql("SELECT order_item_order_id, order_item_id, order_item_subtotal, \
round(SUM(order_item_subtotal) OVER(PARTITION BY order_item_order_id),2) order_revenue , \
(round(order_item_subtotal /SUM(order_item_subtotal) OVER(PARTITION BY order_item_order_id),2)) pcnt FROM order_items ORDER BY 1 ").show()



# 5. Difference in top 2 order items for the order revenue

In [None]:
p51= order_items.withColumn('rnk',rank().over(Window.partitionBy('order_item_order_id').orderBy(col('order_item_subtotal').desc()))).withColumn('next',lead('order_item_subtotal').over(Window.partitionBy('order_item_order_id').orderBy(col('order_item_subtotal').desc())))

In [None]:
p52=p51.select('order_item_order_id','order_item_id','order_item_subtotal','next','rnk')

In [None]:
p53=p52.orderBy('order_item_order_id','rnk').filter(col('rnk')==1).fillna(0).withColumn('diff',round(col('order_item_subtotal').cast('double')-col('next').cast('double'),2)).drop('rnk')

In [None]:
p5=p53

In [None]:
p5.show()

### SQL

In [None]:
spark.sql("SELECT s.*, s.order_item_subtotal - s.next_item diff FROM ( SELECT order_item_order_id, order_item_id, order_item_subtotal , LEAD(order_item_subtotal) OVER(PARTITION BY order_item_order_id ORDER BY order_item_subtotal DESC) next_item, RANK() OVER(PARTITION BY order_item_order_id ORDER BY order_item_subtotal DESC) rnk FROM order_items) s WHERE rnk =  1 ORDER BY 1, 5 ").show()


# 6. Get order items contributing more than 75% of the total order

In [None]:
p61 = order_items.withColumn('order_revenue',sum('order_item_subtotal').over(Window.partitionBy('order_item_order_id'))).\
withColumn('pcnt',round(col('order_item_subtotal')/col('order_revenue'),2))

In [None]:
p62= p61.filter(col('pcnt') >= 0.75).drop('order_item_product_id','order_item_quantity','order_item_product_price').orderBy('order_item_order_id')

In [None]:
p6=p62

In [None]:
p6.show()

### SQL


In [None]:
spark.sql("SELECT s.*  FROM (SELECT order_item_id, order_item_order_id, order_item_subtotal, round(sum(order_item_subtotal) over(partition by order_item_order_id),2) order_revenue , round(order_item_subtotal/round(sum(order_item_subtotal) over(partition by order_item_order_id),2),2) pcnt FROM order_items) s WHERE pcnt >= 0.75 ORDER BY 1").show()


# 7.What are the best-selling and the second best-selling products in every category?

In [None]:
p71 = order_items.join(products, order_items.order_item_product_id == products.product_id).join(categories, categories.category_id == products.product_category_id).groupBy('category_name','product_name').agg(round(sum(order_items.order_item_subtotal),2).alias('product_revenue'))

In [None]:
p72=p71.withColumn('rnk',rank().over(Window.partitionBy('category_name').orderBy(col('product_revenue').desc()))).filter(col('rnk') <=2).orderBy('category_name','rnk')

In [None]:
p7=p72

In [None]:
p7.show()

### SQL

In [None]:
spark.sql("SELECT * FROM (SELECT c.category_name, p.product_name, round(sum(oi.order_item_subtotal),2) product_revenue, rank() over(partition by c.category_name ORDER BY round(sum(oi.order_item_subtotal)) DESC) rnk FROM order_items oi, products p, categories c WHERE oi.order_item_product_id = p.product_id AND p.product_category_id = c.category_id GROUP BY c.category_name, p.product_name ) WHERE rnk<= 2 ORDER BY 1,4").show()

# 8. What is the difference between the revenue of each product and the revenue of the best-selling product in the same category of that product?

In [None]:
p8=order_items.join(products, products.product_id == order_items.order_item_product_id).\
join(categories, categories.category_id ==  products.product_category_id).\
groupBy('category_name','product_name').agg(round(sum('order_item_subtotal'),2).alias('product_revenue')).\
withColumn('best',max(col('product_revenue')).over(Window.partitionBy('category_name'))).withColumn('diff_prices',col('best')-col('product_revenue')).\
orderBy('category_name','product_revenue')

In [None]:
p8.show()

### SQL

In [None]:
spark.sql("SELECT s.*, best_product - product_revenue diff_prices FROM (SELECT c.category_name, p.product_name, ROUND(SUM(oi.order_item_subtotal),2) product_revenue, MAX(ROUND(SUM(oi.order_item_subtotal),2)) OVER(PARTITION BY c.category_name) best_product FROM categories c, products p , order_items oi WHERE oi.order_item_product_id = p.product_id AND p.product_category_id = c.category_id GROUP BY c.category_name, p.product_name) s ORDER BY 1,3").show()



# 9.Most selling product (But Quantity not by Cost) for every month in the database (Between July 2013 to July 2014) 

In [None]:
p91=orders.join(order_items, orders.order_id == order_items.order_item_order_id).join(products, products.product_id == order_items.order_item_product_id).withColumn('date_month',date_format(orders.order_date,'yyyyMM').cast('bigint')).filter(col('date_month').between(201307,201407))

In [None]:
p92=p91.groupBy(col('date_month'),'product_name').agg(sum(order_items.order_item_quantity).alias('order_quantity'))

In [None]:
p93=p92.withColumn('rnk',rank().over(Window.partitionBy(col('date_month')).orderBy(col('order_quantity').desc()))).filter(col('rnk')==1).orderBy(col('date_month'))

In [None]:
p9=p93
p9.show()

### SQL

In [None]:
spark.sql("SELECT * FROM (SELECT date_format(o.order_date,'yyyyMM') order_month, p.product_name, sum(oi.order_item_quantity) order_quantity, rank() over(partition by date_format(o.order_date,'yyyyMM') order by sum(oi.order_item_quantity) desc) rnk from orders o, order_items oi, products p WHERE oi.order_item_order_id = o.order_id and oi.order_item_product_id = p.product_id GROUP BY date_format(o.order_date,'yyyyMM'), p.product_name) s WHERE rnk=1 ORDER BY 1").show()

# 10. Who are the top 10 revenue generating customers

In [None]:
orders.join(order_items, orders.order_id ==  order_items.order_item_order_id).join(customers, customers.customer_id ==  orders.order_customer_id).\
groupBy(customers.customer_id, concat_ws(' ',customers.customer_fname,customers.customer_lname)).\
agg(sum(order_items.order_item_subtotal).alias('customer_revenue')).orderBy(col('customer_revenue').desc()).limit(10).show()

# 11. What are the top 10 revenue generating products

In [None]:
orders.join(order_items, orders.order_id ==  order_items.order_item_order_id).join(products, products.product_id==order_items.order_item_product_id).\
groupBy('product_id','product_name').agg(round(sum(order_items.order_item_subtotal),2).alias('product_revenue')).\
orderBy(col('product_revenue').desc()).limit(10).show()

# 12. Top 5 revenue generating deparments

In [None]:
order_items.join(products, products.product_id ==  order_items.order_item_product_id).\
join(categories, categories.category_id == products.product_category_id).\
join(departments, departments.department_id ==  categories.category_department_id).\
groupBy('department_id','department_name').agg(round(sum(order_items.order_item_subtotal),2).alias('dept_revenue')).\
orderBy(col('dept_revenue').desc()).limit(5).show()

# 13. Top 5 revenue generating cities (from address of Customers)

In [None]:
orders.join(order_items, orders.order_id ==  order_items.order_item_order_id).\
join(customers, customers.customer_id ==  orders.order_customer_id).\
groupBy(customers.customer_city).agg(round(sum(order_items.order_item_subtotal),2).alias('city_revenue')).\
orderBy(col('city_revenue').desc()).limit(5).show()