In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
        builder. \
        config('spark.ui.port','0'). \
        config("spark.sql.warehouse.dir", f"/user/itv013286/warehouse"). \
        enableHiveSupport(). \
        master('yarn'). \
        getOrCreate()

In [2]:
spark

### weneed to consider 3 datasets
#### orders
#### ========
#### order_id,order_date,order_customer_id,order_statuscustomers
#### ==========
#### customer_id,customer_fname,customer_lname,customer_email,customer_password,customer_street,customer_city,customer_state,customer_zipcodeorder_items
#### ==============
#### order_item_id,order_id,order_item_product_id,order_item_quantity,order_item_subtotal,order_item_product_price

#### Note-onecustomercanhavemultipleordersintheordersdataset.oneordercanhavemultipleorder_itemsintheorder_itemstable1.

#### 1.weneedtofindtop10customerswhohavespentthemostamount(premiumcustomers

In [3]:
orders_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/orders/*")

In [4]:
orders_rdd.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

In [5]:
orders_map = orders_rdd.map(lambda x :((int(x.split(",")[0]),int(x.split(",")[2]))))

In [6]:
orders_map.take(5)

[(1, 11599), (2, 256), (3, 12111), (4, 8827), (5, 11318)]

In [5]:
order_items_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/order_items/*")

In [6]:
order_items_map = order_items_rdd.map(lambda x :((int(x.split(",")[1])), float(x.split(",")[4]) ))

In [7]:
order_items_map.take(5)

[(1, 299.98), (2, 199.99), (2, 250.0), (2, 129.99), (4, 49.98)]

In [10]:
join_rdd = order_items_map.join(orders_map)

In [11]:
join_rdd.take(5)

[(4, (49.98, 8827)),
 (4, (299.95, 8827)),
 (4, (150.0, 8827)),
 (4, (199.92, 8827)),
 (8, (179.97, 2911))]

In [12]:
join_map = join_rdd.map(lambda x:(x[1][1],x[1][0]))


In [13]:
reduced_rdd=join_map.reduceByKey(lambda x,y:x+y).sortBy(lambda x:x[1],ascending=False)
reduced_rdd.take(10)


[(791, 10524.169999999996),
 (9371, 9299.029999999999),
 (8766, 9296.14),
 (1657, 9223.71),
 (2641, 9130.92),
 (1288, 9019.11),
 (3710, 9019.099999999999),
 (4249, 8918.85),
 (5654, 8904.95),
 (5624, 8761.98)]

#### 2.top10 product id's with most quantities sold

In [22]:
order_itemsmap = order_items_rdd.map(lambda x : (int(x.split(",")[2]),int(x.split(",")[3])))

In [23]:
order_itemsmap.take(5)

[(957, 1), (1073, 1), (502, 5), (403, 1), (897, 2)]

In [28]:
orders_item_reduce = order_itemsmap.reduceByKey(lambda x,y:x+y).sortBy(lambda x: x[1], ascending = False)

In [29]:
orders_item_reduce.take(5)

[(365, 73698), (502, 62956), (1014, 57803), (191, 36680), (627, 31735)]

### 3.how many customers are from Caguas city

In [8]:
customers_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/customers/*")

In [9]:
customers_rdd.take(5)

['1,Richard,Hernandez,XXXXXXXXX,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521',
 '2,Mary,Barrett,XXXXXXXXX,XXXXXXXXX,9526 Noble Embers Ridge,Littleton,CO,80126',
 '3,Ann,Smith,XXXXXXXXX,XXXXXXXXX,3422 Blue Pioneer Bend,Caguas,PR,00725',
 '4,Mary,Jones,XXXXXXXXX,XXXXXXXXX,8324 Little Common,San Marcos,CA,92069',
 '5,Robert,Hudson,XXXXXXXXX,XXXXXXXXX,"10 Crystal River Mall ",Caguas,PR,00725']

In [34]:
customer_map = customers_rdd.map(lambda x : x.split(",")[6])

In [35]:
customer_map.take(5)

['Brownsville', 'Littleton', 'Caguas', 'San Marcos', 'Caguas']

In [36]:
customer_map.filter(lambda x : x == 'Caguas').count()

4584

#### Top3 states with maximum customers

In [42]:
customerstatemap = customers_rdd.map(lambda x : (x.split(",")[7],1))

In [43]:
customerstatemap.take(5)

[('TX', 1), ('CO', 1), ('PR', 1), ('CA', 1), ('PR', 1)]

In [46]:
customerreduce = customerstatemap.reduceByKey(lambda x,y : x+y).sortBy(lambda x : x[1], ascending = False)

In [47]:
customerreduce.take(3)

[('PR', 4771), ('CA', 2012), ('NY', 775)]

#### how many customers have spent more than $1000 in total

In [13]:
orders_map = orders_rdd.map(lambda x : (int(x.split(",")[0]),int(x.split(",")[2])))

In [14]:
orders_map.take(5)

[(1, 11599), (2, 256), (3, 12111), (4, 8827), (5, 11318)]

In [19]:
order_items_map = order_items_rdd.map(lambda x: (int(x.split(",")[1]),float(x.split(",")[4])))

In [20]:
order_items_map.take(5)

[(1, 299.98), (2, 199.99), (2, 250.0), (2, 129.99), (4, 49.98)]

In [21]:
join = order_items_map.join(orders_map)

In [22]:
join.take(5)

[(35212, (49.98, 8774)),
 (35212, (299.97, 8774)),
 (35212, (249.9, 8774)),
 (35212, (49.98, 8774)),
 (35212, (149.94, 8774))]

In [24]:
join_map = join.map(lambda x : (x[1][1],x[1][0]))

In [25]:
join_map.take(5)

[(8827, 49.98), (8827, 299.95), (8827, 150.0), (8827, 199.92), (2911, 179.97)]

In [26]:
joinreduce = join_map.reduceByKey(lambda x,y : x+y)

In [30]:
result = joinreduce.filter(lambda x :x[1]>1000)

In [31]:
result.count()

11148

#### which state has most number of orders in CLOSEDs tatus

In [36]:
orders_map = orders_rdd.map(lambda x : (int(x.split(",")[2]),x.split(",")[3])).filter(lambda x :x[1]=='CLOSED')

In [37]:
orders_map.take(5)

[(11599, 'CLOSED'),
 (8827, 'CLOSED'),
 (1837, 'CLOSED'),
 (1205, 'CLOSED'),
 (11441, 'CLOSED')]

In [42]:
customers_map = customers_rdd.map(lambda x:(int(x.split(',')[0]),x.split(',')[7]))

In [43]:
join = customers_map.join(orders_map)

In [44]:
join.take(5)

[(4, ('CA', 'CLOSED')),
 (8, ('MA', 'CLOSED')),
 (12, ('TX', 'CLOSED')),
 (16, ('PR', 'CLOSED')),
 (20, ('NJ', 'CLOSED'))]

In [50]:
join_map = join.map(lambda x: (x[1][0],1))

In [51]:
join_map.take(5)

[('CA', 1), ('MA', 1), ('TX', 1), ('PR', 1), ('NJ', 1)]

In [55]:
join_reduce = join_map.reduceByKey(lambda x,y : x+y).sortBy(lambda x :x[1], ascending = False)

In [56]:
join_reduce.take(5)

[('PR', 2891), ('CA', 1232), ('NY', 450), ('TX', 403), ('IL', 313)]

#### how many customers are active(active customers are the one's who placed atleast one order)

In [57]:
orders_map = orders_rdd.map(lambda x : (x.split(",")[2],1))

In [58]:
ordersreduce = orders_map.reduceByKey(lambda x,y : x+y)

In [60]:
ordersreduce.filter(lambda x : x[1]>=1).count()

12405

### What is the revenue generate dby each state in sorted order.

In [10]:
orders_map = orders_rdd.map(lambda x :(int(x.split(",")[2]),int(x.split(",")[0])))

In [11]:
customrt_map = customers_rdd.map(lambda x:(int(x.split(',')[0]),x.split(',')[7]))

In [12]:
join_rdd = customrt_map.join(orders_map)

In [13]:
join_rdd.take(5)

[(4, ('CA', 9023)),
 (4, ('CA', 9704)),
 (4, ('CA', 17253)),
 (4, ('CA', 37878)),
 (4, ('CA', 49339))]

In [14]:
join_map = join_rdd.map(lambda x : (x[1][0], x[1][1]))

In [15]:
join_map.take(5)

[('CA', 9023), ('CA', 9704), ('CA', 17253), ('CA', 37878), ('CA', 49339)]

In [25]:
order_items_map=order_items_rdd.map(lambda x:(int(x.split(',')[1]),float(x.split(',')[4])))

In [26]:
order_items_map.take(5)

[(1, 299.98), (2, 199.99), (2, 250.0), (2, 129.99), (4, 49.98)]

In [33]:
join_new = join_map.join(order_items_map)

In [None]:
reduced_rdd=join_new_rdd.map(lambda x:(x[1][0],x[1][1])).reduceByKey(lambda x,y:x+y)
final_rdd=reduced_rdd.sortBy(lambda x:x[1],ascending=False)
final_rdd.collect()
