In [1]:
from pyspark.sql import SparkSession
import getpass
username=getpass.getuser()
spark=SparkSession.\
builder.\
config('spark.ui.port','0').\
config("spark.sql.warehouse.dir",f"/user/{username}/warehouse").\
enableHiveSupport().\
master('yarn').\
getOrCreate()

In [2]:
spark

In [3]:
orders_rdd=spark.sparkContext.textFile("/public/retail_db/orders/part-00000")

In [5]:
orders_rdd.take(10)#take is an action, collect will show all records

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE',
 '6,2013-07-25 00:00:00.0,7130,COMPLETE',
 '7,2013-07-25 00:00:00.0,4530,COMPLETE',
 '8,2013-07-25 00:00:00.0,2911,PROCESSING',
 '9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT',
 '10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT']

# 1. Count orders under each status

In [6]:
mapped_rdd=orders_rdd.map(lambda x: (x.split(",")[3],1))

In [7]:
mapped_rdd.take(5)

[('CLOSED', 1),
 ('PENDING_PAYMENT', 1),
 ('COMPLETE', 1),
 ('CLOSED', 1),
 ('COMPLETE', 1)]

In [10]:
#or
mapped_rdd.countByValue()#its an action as it displays result after execution
#use it when you know that after this there is no transformation

defaultdict(int,
            {('CLOSED', 1): 7556,
             ('PENDING_PAYMENT', 1): 15030,
             ('COMPLETE', 1): 22899,
             ('PROCESSING', 1): 8275,
             ('PAYMENT_REVIEW', 1): 729,
             ('PENDING', 1): 7610,
             ('ON_HOLD', 1): 3798,
             ('CANCELED', 1): 1428,
             ('SUSPECTED_FRAUD', 1): 1558})

# reduce will give single output

In [10]:
reduced_rdd=mapped_rdd.reduceByKey(lambda x,y:x+y)

In [11]:
reduced_rdd.collect()

[('CLOSED', 7556),
 ('CANCELED', 1428),
 ('PENDING_PAYMENT', 15030),
 ('COMPLETE', 22899),
 ('PROCESSING', 8275),
 ('PAYMENT_REVIEW', 729),
 ('PENDING', 7610),
 ('ON_HOLD', 3798),
 ('SUSPECTED_FRAUD', 1558)]

In [14]:
sort_rdd = reduced_rdd.sortBy(lambda x:x[1])

In [15]:
sort_rdd.collect()

[('PAYMENT_REVIEW', 729),
 ('CANCELED', 1428),
 ('SUSPECTED_FRAUD', 1558),
 ('ON_HOLD', 3798),
 ('CLOSED', 7556),
 ('PENDING', 7610),
 ('PROCESSING', 8275),
 ('PENDING_PAYMENT', 15030),
 ('COMPLETE', 22899)]

In [16]:
sort_rdd = reduced_rdd.sortBy(lambda x:x[1],False)#descending

In [17]:
sort_rdd.collect()

[('COMPLETE', 22899),
 ('PENDING_PAYMENT', 15030),
 ('PROCESSING', 8275),
 ('PENDING', 7610),
 ('CLOSED', 7556),
 ('ON_HOLD', 3798),
 ('SUSPECTED_FRAUD', 1558),
 ('CANCELED', 1428),
 ('PAYMENT_REVIEW', 729)]

# 2. Find the premium customers(top 10 who placed most number of orders)

In [3]:
orders_rdd=spark.sparkContext.textFile("/public/retail_db/orders/part-00000")

In [4]:
orders_rdd.take(20)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE',
 '6,2013-07-25 00:00:00.0,7130,COMPLETE',
 '7,2013-07-25 00:00:00.0,4530,COMPLETE',
 '8,2013-07-25 00:00:00.0,2911,PROCESSING',
 '9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT',
 '10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT',
 '11,2013-07-25 00:00:00.0,918,PAYMENT_REVIEW',
 '12,2013-07-25 00:00:00.0,1837,CLOSED',
 '13,2013-07-25 00:00:00.0,9149,PENDING_PAYMENT',
 '14,2013-07-25 00:00:00.0,9842,PROCESSING',
 '15,2013-07-25 00:00:00.0,2568,COMPLETE',
 '16,2013-07-25 00:00:00.0,7276,PENDING_PAYMENT',
 '17,2013-07-25 00:00:00.0,2667,COMPLETE',
 '18,2013-07-25 00:00:00.0,1205,CLOSED',
 '19,2013-07-25 00:00:00.0,9488,PENDING_PAYMENT',
 '20,2013-07-25 00:00:00.0,9198,PROCESSING']

In [5]:
mapped_rdd=orders_rdd.map(lambda x: (x.split(",")[2],1))

In [6]:
mapped_rdd.take(20)

[('11599', 1),
 ('256', 1),
 ('12111', 1),
 ('8827', 1),
 ('11318', 1),
 ('7130', 1),
 ('4530', 1),
 ('2911', 1),
 ('5657', 1),
 ('5648', 1),
 ('918', 1),
 ('1837', 1),
 ('9149', 1),
 ('9842', 1),
 ('2568', 1),
 ('7276', 1),
 ('2667', 1),
 ('1205', 1),
 ('9488', 1),
 ('9198', 1)]

In [7]:
reduced_rdd=mapped_rdd.reduceByKey(lambda x,y:x+y)

In [8]:
reduced_rdd.take(20)

[('256', 10),
 ('12111', 6),
 ('11318', 6),
 ('7130', 7),
 ('2911', 6),
 ('5657', 12),
 ('9149', 4),
 ('9842', 7),
 ('7276', 5),
 ('9488', 7),
 ('2711', 3),
 ('333', 6),
 ('656', 5),
 ('6983', 6),
 ('4189', 3),
 ('4840', 2),
 ('5863', 6),
 ('8214', 5),
 ('7776', 8),
 ('1549', 4)]

In [9]:
sort_rdd = reduced_rdd.sortBy(lambda x:x[1],False)#descending

In [11]:
sort_rdd.take(10)

[('5897', 16),
 ('6316', 16),
 ('12431', 16),
 ('569', 16),
 ('4320', 15),
 ('221', 15),
 ('5624', 15),
 ('5283', 15),
 ('12284', 15),
 ('5654', 15)]

# 3. Distinct count of customers who placed atleast one order

In [12]:
orders_rdd=spark.sparkContext.textFile("/public/retail_db/orders/part-00000")

In [13]:
orders_rdd.take(20)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE',
 '6,2013-07-25 00:00:00.0,7130,COMPLETE',
 '7,2013-07-25 00:00:00.0,4530,COMPLETE',
 '8,2013-07-25 00:00:00.0,2911,PROCESSING',
 '9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT',
 '10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT',
 '11,2013-07-25 00:00:00.0,918,PAYMENT_REVIEW',
 '12,2013-07-25 00:00:00.0,1837,CLOSED',
 '13,2013-07-25 00:00:00.0,9149,PENDING_PAYMENT',
 '14,2013-07-25 00:00:00.0,9842,PROCESSING',
 '15,2013-07-25 00:00:00.0,2568,COMPLETE',
 '16,2013-07-25 00:00:00.0,7276,PENDING_PAYMENT',
 '17,2013-07-25 00:00:00.0,2667,COMPLETE',
 '18,2013-07-25 00:00:00.0,1205,CLOSED',
 '19,2013-07-25 00:00:00.0,9488,PENDING_PAYMENT',
 '20,2013-07-25 00:00:00.0,9198,PROCESSING']

In [16]:
mapped_rdd=orders_rdd.map(lambda x: x.split(",")[2]).distinct()

In [17]:
mapped_rdd.take(20)

['256',
 '12111',
 '11318',
 '7130',
 '2911',
 '5657',
 '9149',
 '9842',
 '7276',
 '9488',
 '2711',
 '333',
 '656',
 '6983',
 '4189',
 '4840',
 '5863',
 '8214',
 '7776',
 '1549']

In [19]:
mapped_rdd.count()#here count acts like an action

12405

# 4. Which customer has max number of closed orders

In [2]:
orders_rdd=spark.sparkContext.textFile("/public/retail_db/orders/part-00000")

In [3]:
orders_rdd.take(20)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE',
 '6,2013-07-25 00:00:00.0,7130,COMPLETE',
 '7,2013-07-25 00:00:00.0,4530,COMPLETE',
 '8,2013-07-25 00:00:00.0,2911,PROCESSING',
 '9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT',
 '10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT',
 '11,2013-07-25 00:00:00.0,918,PAYMENT_REVIEW',
 '12,2013-07-25 00:00:00.0,1837,CLOSED',
 '13,2013-07-25 00:00:00.0,9149,PENDING_PAYMENT',
 '14,2013-07-25 00:00:00.0,9842,PROCESSING',
 '15,2013-07-25 00:00:00.0,2568,COMPLETE',
 '16,2013-07-25 00:00:00.0,7276,PENDING_PAYMENT',
 '17,2013-07-25 00:00:00.0,2667,COMPLETE',
 '18,2013-07-25 00:00:00.0,1205,CLOSED',
 '19,2013-07-25 00:00:00.0,9488,PENDING_PAYMENT',
 '20,2013-07-25 00:00:00.0,9198,PROCESSING']

In [4]:
mapped_rdd=orders_rdd.filter(lambda x: (x.split(",")[3]=='CLOSED'))

In [5]:
mapped_rdd.take(20)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '12,2013-07-25 00:00:00.0,1837,CLOSED',
 '18,2013-07-25 00:00:00.0,1205,CLOSED',
 '24,2013-07-25 00:00:00.0,11441,CLOSED',
 '25,2013-07-25 00:00:00.0,9503,CLOSED',
 '37,2013-07-25 00:00:00.0,5863,CLOSED',
 '51,2013-07-25 00:00:00.0,12271,CLOSED',
 '57,2013-07-25 00:00:00.0,7073,CLOSED',
 '61,2013-07-25 00:00:00.0,4791,CLOSED',
 '62,2013-07-25 00:00:00.0,9111,CLOSED',
 '87,2013-07-25 00:00:00.0,3065,CLOSED',
 '90,2013-07-25 00:00:00.0,9131,CLOSED',
 '101,2013-07-25 00:00:00.0,5116,CLOSED',
 '116,2013-07-26 00:00:00.0,8763,CLOSED',
 '129,2013-07-26 00:00:00.0,9937,CLOSED',
 '133,2013-07-26 00:00:00.0,10604,CLOSED',
 '191,2013-07-26 00:00:00.0,16,CLOSED',
 '201,2013-07-26 00:00:00.0,9055,CLOSED',
 '211,2013-07-26 00:00:00.0,10372,CLOSED']

In [6]:
mapped_rdd1=mapped_rdd.map(lambda x: (x.split(",")[2],1))

In [8]:
mapped_rdd1.take(20)

[('11599', 1),
 ('8827', 1),
 ('1837', 1),
 ('1205', 1),
 ('11441', 1),
 ('9503', 1),
 ('5863', 1),
 ('12271', 1),
 ('7073', 1),
 ('4791', 1),
 ('9111', 1),
 ('3065', 1),
 ('9131', 1),
 ('5116', 1),
 ('8763', 1),
 ('9937', 1),
 ('10604', 1),
 ('16', 1),
 ('9055', 1),
 ('10372', 1)]

In [9]:
mapped_rdd11=mapped_rdd1.reduceByKey(lambda x,y:x+y)

In [10]:
mapped_rdd11.take(20)

[('5863', 1),
 ('12271', 2),
 ('7073', 1),
 ('3065', 2),
 ('5116', 2),
 ('8763', 1),
 ('10604', 2),
 ('16', 1),
 ('9055', 3),
 ('10372', 3),
 ('11715', 1),
 ('5925', 1),
 ('8309', 3),
 ('948', 1),
 ('5191', 1),
 ('7650', 2),
 ('4199', 2),
 ('6989', 1),
 ('5011', 4),
 ('11394', 1)]

In [11]:
sort_rdd = mapped_rdd11.sortBy(lambda x:x[1],False)#descending

In [12]:
sort_rdd.take(20)

[('1833', 6),
 ('1363', 5),
 ('1687', 5),
 ('5493', 5),
 ('5011', 4),
 ('8974', 4),
 ('2321', 4),
 ('3736', 4),
 ('8368', 4),
 ('2236', 4),
 ('2403', 4),
 ('7879', 4),
 ('1764', 4),
 ('4588', 4),
 ('7948', 4),
 ('7850', 4),
 ('145', 4),
 ('4282', 4),
 ('9213', 4),
 ('3631', 4)]