In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window as W
import pyspark.sql.functions as f
from pyspark.sql.types import *

import getpass

username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port','0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
appName('week4'). \
getOrCreate()

#### Top 10 Premium Customer

In [22]:
#reading files
orders=spark.sparkContext.textFile("/public/trendytech/retail_db/orders")
order_items=spark.sparkContext.textFile("/public/trendytech/retail_db/order_items")
customers=spark.sparkContext.textFile("/public/trendytech/retail_db/customers")

In [45]:
#Preparing dataset
order_items_mapped=order_items.map(lambda x: (x.split(',')[1],float(x.split(',')[4])))
orders_mapped=orders.map(lambda x: (x.split(',')[0],x.split(',')[2]))
orders_rdd=orders_mapped.join(order_items_mapped)

In [46]:
orders_rdd.take(5)

[('34566', ('3066', 250.0)),
 ('34566', ('3066', 179.97)),
 ('34577', ('7733', 299.98)),
 ('34577', ('7733', 299.95)),
 ('34577', ('7733', 299.98))]

In [47]:
#Now taking sum of all orders against customers
cust_orders=orders_rdd.map(lambda x: (x[1][0],x[1][1]))
cust_orders.take(5)

[('8827', 49.98),
 ('8827', 299.95),
 ('8827', 150.0),
 ('8827', 199.92),
 ('5648', 199.99)]

In [48]:
cust_tot_orders=cust_orders.reduceByKey(lambda x,y: x+y)

In [49]:
#getting customer details
customer_mapped=customers.map(lambda x: (x.split(",")[0],x.split(",")[1]))

In [50]:
#Join based on customer id
final_rdd=cust_orders.join(customer_mapped)
final_rdd.take(5)

[('1737', (129.99, 'Mary')),
 ('1737', (129.99, 'Mary')),
 ('1737', (50.0, 'Mary')),
 ('1737', (129.99, 'Mary')),
 ('1737', (399.98, 'Mary'))]

In [51]:
#Premium Customer
final_rdd_sorted=final_rdd.sortBy(lambda x: x[1][0],ascending=False).map(lambda x: (x[1][1],x[1][0]))
final_rdd_sorted.take(10)

[('Matthew', 1999.99),
 ('Mary', 1999.99),
 ('Mary', 1999.99),
 ('Amber', 1999.99),
 ('Charles', 1999.99),
 ('Daniel', 1999.99),
 ('Mary', 1999.99),
 ('Teresa', 1999.99),
 ('Phillip', 1999.99),
 ('Helen', 1999.99)]

#### 2.top 10 product id's with most quantities sold

In [53]:
order_items=spark.sparkContext.textFile("/public/trendytech/retail_db/order_items")
orders_map=order_items.map(lambda x: (x.split(',')[2],int(x.split(',')[3])))
prod_quant=orders_map.reduceByKey(lambda x,y: x+y)

In [54]:
prod_quant.sortBy(lambda x: x[1],ascending=False).take(10)

[('365', 73698),
 ('502', 62956),
 ('1014', 57803),
 ('191', 36680),
 ('627', 31735),
 ('403', 22246),
 ('1004', 17325),
 ('1073', 15500),
 ('957', 13729),
 ('977', 998)]

##### 3. how many customers are from Caguas city

In [4]:
#understanding the data
cust_cities=customers.map(lambda x: x.split(',')[6])
cust_cag=cust_cities.filter(lambda x: x=='Caguas')

In [6]:
cust_cag.count()

4584

#### 4. Top 3 states with maximum customers

In [7]:
#understanding the data
cust_states=customers.map(lambda x: (x.split(',')[7],1))
state_wise_cust=cust_states.reduceByKey(lambda x,y: x+y)
state_wise_cust_sorted= state_wise_cust.sortBy(lambda x: x[1],ascending=False)
state_wise_cust_sorted.take(5)

[('PR', 4771), ('CA', 2012), ('NY', 775), ('TX', 635), ('IL', 523)]

#### 5. how many customers have spent more than $1000 in total

In [24]:
#Finding order_ids where total is greater than 1000
order_subtotal=order_items.map(lambda x: (x.split(',')[1],float(x.split(',')[4])))
order_total=order_subtotal.reduceByKey(lambda x,y: x+y)
order_total.take(2)
#Filtering records 
orders_filtered=order_total.filter(lambda x: x[1]>1000 )
orders_filtered.take(2)

[('35214', 1319.8500000000001), ('35230', 1249.79)]

In [16]:
orders.take(2)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT']

In [26]:
#Getting customer ID
orders_mapped=orders.map(lambda x: (x.split(',')[0],x.split(',')[2]))
order_cust=orders_filtered.join(orders_mapped).map(lambda x: x[1][1])
order_cust.distinct().count()

5655

#### 6. which state has most number of orders in CLOSED status

In [27]:
#Filtering 
orders_mapped=orders.map(lambda x: (x.split(',')[2],x.split(',')[3]))
orders_clsd=orders_mapped.filter(lambda x: x[1]=='CLOSED')
#mapping customers
cust_states=customers.map(lambda x: (x.split(',')[0],x.split(',')[7]))
orders_cust=orders_clsd.join(cust_states).cache()
orders_cust.take(5)

[('5834', ('CLOSED', 'PR')),
 ('5834', ('CLOSED', 'PR')),
 ('10173', ('CLOSED', 'PA')),
 ('6000', ('CLOSED', 'PR')),
 ('1352', ('CLOSED', 'MA'))]

In [29]:
state_clsd=orders_cust.map(lambda x: (x[1][1],1))
state_clsd_tot=state_clsd.reduceByKey(lambda x,y:x+y)
state_clsd_tot=state_clsd_tot.sortBy(lambda x:x[1],ascending=False)
state_clsd_tot.take(1)

[('PR', 2891)]

#### 7. how many customers are active (active customers are the one's who placed atleast one order)

In [30]:
orders_mapped=orders.map(lambda x: x.split(',')[2])
orders_mapped.distinct().count()


12405

#### 8. What is the revenue generated by each state in sorted order.

In [31]:
#order wise revenue
order_subtotal=order_items.map(lambda x: (x.split(',')[1],float(x.split(',')[4])))
order_total=order_subtotal.reduceByKey(lambda x,y: x+y)

#getting customer id
orders_mapped=orders.map(lambda x: (x.split(',')[0],x.split(',')[2]))
orders_cust=order_total.join(orders_mapped)

orders_cust.take(5)

[('34566', (429.97, '3066')),
 ('34577', (1039.88, '7733')),
 ('34583', (414.94, '1558')),
 ('34595', (129.99, '7248')),
 ('34599', (657.9300000000001, '12328'))]

In [32]:
orders_cust=orders_cust.map(lambda x: (x[1][1],x[1][0]))
#getting state
cust_states=customers.map(lambda x: (x.split(',')[0],x.split(',')[7]))
orders_state=orders_cust.join(cust_states)
orders_state.cache()

PythonRDD[77] at RDD at PythonRDD.scala:53

In [33]:
orders_state.take(5)

[('6280', (519.94, 'PR')),
 ('6280', (1279.88, 'PR')),
 ('6280', (1014.96, 'PR')),
 ('6280', (559.9, 'PR')),
 ('1718', (266.0, 'NY'))]

In [35]:
state_orders=orders_state.map(lambda x:( x[1][1],x[1][0]))
state_orders=state_orders.reduceByKey(lambda x,y: x+y)
state_orders_sort=state_orders.sortBy(lambda x:x[1],ascending=False)
state_orders_sort.take(5)

[('PR', 13208867.68999965),
 ('CA', 5542722.999999965),
 ('NY', 2152706.739999999),
 ('TX', 1731407.4900000014),
 ('IL', 1457225.8300000015)]

In [None]:
!git remote add origin git@github.com:Sbanerjee3219/Trendytech-Assignments.git
# git branch -M main
# git push -u origin main

In [20]:
spark.stop()