In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession.\
builder.\
config('spark.ui.port','0').\
config("spark.sql.warehouse.dir",f"/user/itv012676/warehouse").\
enableHiveSupport().\
master('yarn').\
getOrCreate()


In [2]:
spark

In [3]:
orders_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/orders/*")

The data is in the form of 1,2013-07-25 00:00:00.0,11599,CLOSED

Indicates order id, time stamp, customer id and order status

In [4]:
orders_rdd.collect() #collect() displays entire data

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE',
 '6,2013-07-25 00:00:00.0,7130,COMPLETE',
 '7,2013-07-25 00:00:00.0,4530,COMPLETE',
 '8,2013-07-25 00:00:00.0,2911,PROCESSING',
 '9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT',
 '10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT',
 '11,2013-07-25 00:00:00.0,918,PAYMENT_REVIEW',
 '12,2013-07-25 00:00:00.0,1837,CLOSED',
 '13,2013-07-25 00:00:00.0,9149,PENDING_PAYMENT',
 '14,2013-07-25 00:00:00.0,9842,PROCESSING',
 '15,2013-07-25 00:00:00.0,2568,COMPLETE',
 '16,2013-07-25 00:00:00.0,7276,PENDING_PAYMENT',
 '17,2013-07-25 00:00:00.0,2667,COMPLETE',
 '18,2013-07-25 00:00:00.0,1205,CLOSED',
 '19,2013-07-25 00:00:00.0,9488,PENDING_PAYMENT',
 '20,2013-07-25 00:00:00.0,9198,PROCESSING',
 '21,2013-07-25 00:00:00.0,2711,PENDING',
 '22,2013-07-25 00:00:00.0,333,COMPLETE',
 '23,2013-07-25 00

In [5]:
orders_rdd.take(5) #displays top 5 records

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

1. Find the no. of orders in each category (no. of closed orders, no. of canceled orders etc.,)

In [6]:
orders_rdd_map = orders_rdd.map(lambda x:(x.split(",")[3],1))

In [7]:
orders_rdd_map.take(5)

[('CLOSED', 1),
 ('PENDING_PAYMENT', 1),
 ('COMPLETE', 1),
 ('CLOSED', 1),
 ('COMPLETE', 1)]

In [8]:
orders_rdd_reduce = orders_rdd_map.reduceByKey(lambda x,y : x+y)

In [9]:
orders_rdd_reduce.collect()

[('CLOSED', 7556),
 ('CANCELED', 1428),
 ('PENDING_PAYMENT', 15030),
 ('COMPLETE', 22899),
 ('PROCESSING', 8275),
 ('PAYMENT_REVIEW', 729),
 ('PENDING', 7610),
 ('ON_HOLD', 3798),
 ('SUSPECTED_FRAUD', 1558)]

In [10]:
sorted_rdd = orders_rdd_reduce.sortBy(lambda x:x[1],False)

In [11]:
sorted_rdd.collect()

[('COMPLETE', 22899),
 ('PENDING_PAYMENT', 15030),
 ('PROCESSING', 8275),
 ('PENDING', 7610),
 ('CLOSED', 7556),
 ('ON_HOLD', 3798),
 ('SUSPECTED_FRAUD', 1558),
 ('CANCELED', 1428),
 ('PAYMENT_REVIEW', 729)]

2. Find the premium customers (Top 10 who placed the most number of orders)

In [12]:
orders_rdd_map2 = orders_rdd.map(lambda x:(x.split(",")[2],1))

In [13]:
orders_rdd_reduce2 = orders_rdd_map2.reduceByKey(lambda x,y:x+y)

In [14]:
orders_rdd_reduce2.take(5)

[('256', 10), ('12111', 6), ('11318', 6), ('7130', 7), ('2911', 6)]

In [15]:
orders_rdd_sorted = orders_rdd_reduce2.sortBy(lambda x:x[1],False)


In [16]:
orders_rdd_sorted.take(10)

[('5897', 16),
 ('6316', 16),
 ('12431', 16),
 ('569', 16),
 ('4320', 15),
 ('221', 15),
 ('5624', 15),
 ('5283', 15),
 ('12284', 15),
 ('5654', 15)]

3. Distinct Count of Customers Who Placed atleast One Order

In [17]:
orders_rdd_map3 = orders_rdd.map(lambda x:x.split(",")[2])

In [18]:
distinct_customers = orders_rdd_map3.distinct()

In [19]:
distinct_customers.count()

12405

4. Which Customers had the maximum closed orders

In [20]:
orders_rdd_filter = orders_rdd.filter(lambda x:(x.split(",")[3] == 'CLOSED'))

In [21]:
orders_rdd_filter.collect()

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '12,2013-07-25 00:00:00.0,1837,CLOSED',
 '18,2013-07-25 00:00:00.0,1205,CLOSED',
 '24,2013-07-25 00:00:00.0,11441,CLOSED',
 '25,2013-07-25 00:00:00.0,9503,CLOSED',
 '37,2013-07-25 00:00:00.0,5863,CLOSED',
 '51,2013-07-25 00:00:00.0,12271,CLOSED',
 '57,2013-07-25 00:00:00.0,7073,CLOSED',
 '61,2013-07-25 00:00:00.0,4791,CLOSED',
 '62,2013-07-25 00:00:00.0,9111,CLOSED',
 '87,2013-07-25 00:00:00.0,3065,CLOSED',
 '90,2013-07-25 00:00:00.0,9131,CLOSED',
 '101,2013-07-25 00:00:00.0,5116,CLOSED',
 '116,2013-07-26 00:00:00.0,8763,CLOSED',
 '129,2013-07-26 00:00:00.0,9937,CLOSED',
 '133,2013-07-26 00:00:00.0,10604,CLOSED',
 '191,2013-07-26 00:00:00.0,16,CLOSED',
 '201,2013-07-26 00:00:00.0,9055,CLOSED',
 '211,2013-07-26 00:00:00.0,10372,CLOSED',
 '213,2013-07-26 00:00:00.0,11715,CLOSED',
 '215,2013-07-26 00:00:00.0,5925,CLOSED',
 '221,2013-07-26 00:00:00.0,11871,CLOSED',
 '222,2013-07-26 00:00:00.0,8309,CLOSED',
 '

In [22]:
orders_rdd_filter.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '12,2013-07-25 00:00:00.0,1837,CLOSED',
 '18,2013-07-25 00:00:00.0,1205,CLOSED',
 '24,2013-07-25 00:00:00.0,11441,CLOSED']

In [23]:
orders_rdd_map4 = orders_rdd_filter.map(lambda x:(x.split(",")[2],1))

In [24]:
orders_rdd_map4.take(5)

[('11599', 1), ('8827', 1), ('1837', 1), ('1205', 1), ('11441', 1)]

In [27]:
orders_rdd_reduce4  = orders_rdd_map4.reduceByKey(lambda x,y : x+y)

In [28]:
orders_rdd_reduce4.take(5)

[('3159', 1), ('5834', 2), ('10173', 1), ('2101', 1), ('6000', 1)]

In [29]:
sorted_orders_rdd4 = orders_rdd_reduce4.sortBy(lambda x:x[1],False)

In [30]:
sorted_orders_rdd4.take(10)

[('1833', 6),
 ('1363', 5),
 ('1687', 5),
 ('5493', 5),
 ('5011', 4),
 ('8974', 4),
 ('2321', 4),
 ('3736', 4),
 ('8368', 4),
 ('2236', 4)]