## Import Modules

In [1]:
import getpass as gp
from pyspark.sql import SparkSession

In [2]:
user_name = gp.getuser()
spark = SparkSession.builder \
    .appName(f'{user_name}-orders-data-program') \
    .master('yarn') \
    .getOrCreate()

In [3]:
spark

In [4]:
sc = spark.sparkContext

In [5]:
sc

In [6]:
!hadoop fs -head /public/retail_db/orders/part-00000

1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE
4,2013-07-25 00:00:00.0,8827,CLOSED
5,2013-07-25 00:00:00.0,11318,COMPLETE
6,2013-07-25 00:00:00.0,7130,COMPLETE
7,2013-07-25 00:00:00.0,4530,COMPLETE
8,2013-07-25 00:00:00.0,2911,PROCESSING
9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT
10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT
11,2013-07-25 00:00:00.0,918,PAYMENT_REVIEW
12,2013-07-25 00:00:00.0,1837,CLOSED
13,2013-07-25 00:00:00.0,9149,PENDING_PAYMENT
14,2013-07-25 00:00:00.0,9842,PROCESSING
15,2013-07-25 00:00:00.0,2568,COMPLETE
16,2013-07-25 00:00:00.0,7276,PENDING_PAYMENT
17,2013-07-25 00:00:00.0,2667,COMPLETE
18,2013-07-25 00:00:00.0,1205,CLOSED
19,2013-07-25 00:00:00.0,9488,PENDING_PAYMENT
20,2013-07-25 00:00:00.0,9198,PROCESSING
21,2013-07-25 00:00:00.0,2711,PENDING
22,2013-07-25 00:00:00.0,333,COMPLETE
23,2013-07-25 00:00:00.0,4367,PENDING_PAYMENT
24,2013-07-25 00:00:00.0,11441,CLOSED
25,2013-07-25 00:00:00

In [7]:
INPUT_FILE_PATH = f'/public/retail_db/orders'

### Create Base File RDD

In [8]:
rdd_file_input = sc.textFile(f'{INPUT_FILE_PATH}/*')

In [9]:
rdd_file_input.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

In [10]:
rdd_split_file_input = rdd_file_input.map(lambda x : x.split(','))

In [11]:
rdd_split_file_input.take(5)

[['1', '2013-07-25 00:00:00.0', '11599', 'CLOSED'],
 ['2', '2013-07-25 00:00:00.0', '256', 'PENDING_PAYMENT'],
 ['3', '2013-07-25 00:00:00.0', '12111', 'COMPLETE'],
 ['4', '2013-07-25 00:00:00.0', '8827', 'CLOSED'],
 ['5', '2013-07-25 00:00:00.0', '11318', 'COMPLETE']]

### 1. Count of Orders for each STATUS

In [12]:
rdd_orders_file_input = rdd_split_file_input.map(lambda x : x[-1])

In [13]:
rdd_orders_file_input.take(5)

['CLOSED', 'PENDING_PAYMENT', 'COMPLETE', 'CLOSED', 'COMPLETE']

#### a. using countByValue() action -> local dict

In [14]:
output = rdd_orders_file_input.countByValue()

In [15]:
output

defaultdict(int,
            {'CLOSED': 7556,
             'PENDING_PAYMENT': 15030,
             'COMPLETE': 22899,
             'PROCESSING': 8275,
             'PAYMENT_REVIEW': 729,
             'PENDING': 7610,
             'ON_HOLD': 3798,
             'CANCELED': 1428,
             'SUSPECTED_FRAUD': 1558})

In [16]:
output = [(key, value) for key, value in output.items()]

In [17]:
output

[('CLOSED', 7556),
 ('PENDING_PAYMENT', 15030),
 ('COMPLETE', 22899),
 ('PROCESSING', 8275),
 ('PAYMENT_REVIEW', 729),
 ('PENDING', 7610),
 ('ON_HOLD', 3798),
 ('CANCELED', 1428),
 ('SUSPECTED_FRAUD', 1558)]

In [18]:
output.sort(key = lambda x : x[1], reverse = True)

In [19]:
output

[('COMPLETE', 22899),
 ('PENDING_PAYMENT', 15030),
 ('PROCESSING', 8275),
 ('PENDING', 7610),
 ('CLOSED', 7556),
 ('ON_HOLD', 3798),
 ('SUSPECTED_FRAUD', 1558),
 ('CANCELED', 1428),
 ('PAYMENT_REVIEW', 729)]

#### b. using map() and reduceByKey() transformation -> distributed rdd

In [20]:
rdd_orders_file_input = rdd_split_file_input.map(lambda x : (x[-1],1))

In [21]:
rdd_orders_file_input.take(5)

[('CLOSED', 1),
 ('PENDING_PAYMENT', 1),
 ('COMPLETE', 1),
 ('CLOSED', 1),
 ('COMPLETE', 1)]

In [22]:
rdd_reduce_orders = rdd_orders_file_input.reduceByKey(lambda x,y : x+y)

In [23]:
rdd_reduce_orders.take(10)

[('CLOSED', 7556),
 ('CANCELED', 1428),
 ('PENDING_PAYMENT', 15030),
 ('COMPLETE', 22899),
 ('PROCESSING', 8275),
 ('PAYMENT_REVIEW', 729),
 ('PENDING', 7610),
 ('ON_HOLD', 3798),
 ('SUSPECTED_FRAUD', 1558)]

In [24]:
# sort by ascending order of count
rdd_sorted_orders = rdd_reduce_orders.sortBy(lambda x : x[1])

In [25]:
rdd_sorted_orders.take(10)

[('PAYMENT_REVIEW', 729),
 ('CANCELED', 1428),
 ('SUSPECTED_FRAUD', 1558),
 ('ON_HOLD', 3798),
 ('CLOSED', 7556),
 ('PENDING', 7610),
 ('PROCESSING', 8275),
 ('PENDING_PAYMENT', 15030),
 ('COMPLETE', 22899)]

In [26]:
# sort by descending order of count
rdd_sorted_orders = rdd_reduce_orders.sortBy(lambda x : x[1], False)

In [27]:
rdd_sorted_orders.take(10)

[('COMPLETE', 22899),
 ('PENDING_PAYMENT', 15030),
 ('PROCESSING', 8275),
 ('PENDING', 7610),
 ('CLOSED', 7556),
 ('ON_HOLD', 3798),
 ('SUSPECTED_FRAUD', 1558),
 ('CANCELED', 1428),
 ('PAYMENT_REVIEW', 729)]

### 2. Find top 10 CUSTOMERS who has placed maximum orders

In [28]:
rdd_split_file_input.take(5)

[['1', '2013-07-25 00:00:00.0', '11599', 'CLOSED'],
 ['2', '2013-07-25 00:00:00.0', '256', 'PENDING_PAYMENT'],
 ['3', '2013-07-25 00:00:00.0', '12111', 'COMPLETE'],
 ['4', '2013-07-25 00:00:00.0', '8827', 'CLOSED'],
 ['5', '2013-07-25 00:00:00.0', '11318', 'COMPLETE']]

In [29]:
rdd_customer_input = rdd_split_file_input.map(lambda x : (x[2],1))

In [30]:
rdd_customer_input.take(5)

[('11599', 1), ('256', 1), ('12111', 1), ('8827', 1), ('11318', 1)]

In [31]:
rdd_reduce_customers = rdd_customer_input.reduceByKey(lambda x,y : x+y)

In [32]:
rdd_reduce_customers.take(10)

[('256', 10),
 ('12111', 6),
 ('11318', 6),
 ('7130', 7),
 ('2911', 6),
 ('5657', 12),
 ('9149', 4),
 ('9842', 7),
 ('7276', 5),
 ('9488', 7)]

In [33]:
# sort by descending order of count
rdd_sorted_customers = rdd_reduce_customers.sortBy(lambda x : x[1], False)

In [34]:
rdd_sorted_customers.take(10)

[('5897', 16),
 ('6316', 16),
 ('12431', 16),
 ('569', 16),
 ('4320', 15),
 ('221', 15),
 ('5624', 15),
 ('5283', 15),
 ('12284', 15),
 ('5654', 15)]

### 3. Find DISTINCT count of customers

In [35]:
rdd_split_file_input.take(5)

[['1', '2013-07-25 00:00:00.0', '11599', 'CLOSED'],
 ['2', '2013-07-25 00:00:00.0', '256', 'PENDING_PAYMENT'],
 ['3', '2013-07-25 00:00:00.0', '12111', 'COMPLETE'],
 ['4', '2013-07-25 00:00:00.0', '8827', 'CLOSED'],
 ['5', '2013-07-25 00:00:00.0', '11318', 'COMPLETE']]

In [36]:
rdd_customer_input = rdd_split_file_input.map(lambda x : x[2])

In [37]:
rdd_customer_input.take(5)

['11599', '256', '12111', '8827', '11318']

In [38]:
# distinct() - to get all the distinct values in a rdd
rdd_distinct_customers = rdd_customer_input.distinct()

In [39]:
rdd_distinct_customers.take(5)

['256', '12111', '11318', '7130', '2911']

In [40]:
# count() - to get the count of items in a rdd (equivalent to len())
count = rdd_distinct_customers.count()

In [41]:
count

12405

### 4. Find top 10 CUSTOMERS whose orders are CLOSED

In [42]:
rdd_split_file_input.take(5)

[['1', '2013-07-25 00:00:00.0', '11599', 'CLOSED'],
 ['2', '2013-07-25 00:00:00.0', '256', 'PENDING_PAYMENT'],
 ['3', '2013-07-25 00:00:00.0', '12111', 'COMPLETE'],
 ['4', '2013-07-25 00:00:00.0', '8827', 'CLOSED'],
 ['5', '2013-07-25 00:00:00.0', '11318', 'COMPLETE']]

In [43]:
# filter(COND) - keeps items which match the condition
rdd_filter_closed = rdd_split_file_input.filter(lambda x : x[-1] == 'CLOSED')

In [44]:
rdd_filter_closed.take(10)

[['1', '2013-07-25 00:00:00.0', '11599', 'CLOSED'],
 ['4', '2013-07-25 00:00:00.0', '8827', 'CLOSED'],
 ['12', '2013-07-25 00:00:00.0', '1837', 'CLOSED'],
 ['18', '2013-07-25 00:00:00.0', '1205', 'CLOSED'],
 ['24', '2013-07-25 00:00:00.0', '11441', 'CLOSED'],
 ['25', '2013-07-25 00:00:00.0', '9503', 'CLOSED'],
 ['37', '2013-07-25 00:00:00.0', '5863', 'CLOSED'],
 ['51', '2013-07-25 00:00:00.0', '12271', 'CLOSED'],
 ['57', '2013-07-25 00:00:00.0', '7073', 'CLOSED'],
 ['61', '2013-07-25 00:00:00.0', '4791', 'CLOSED']]

In [45]:
rdd_customer_input = rdd_filter_closed.map(lambda x : (x[2],1))

In [46]:
rdd_customer_input.take(10)

[('11599', 1),
 ('8827', 1),
 ('1837', 1),
 ('1205', 1),
 ('11441', 1),
 ('9503', 1),
 ('5863', 1),
 ('12271', 1),
 ('7073', 1),
 ('4791', 1)]

In [47]:
rdd_reduce_customers = rdd_customer_input.reduceByKey(lambda x,y : x+y)

In [48]:
rdd_reduce_customers.take(10)

[('5863', 1),
 ('12271', 2),
 ('7073', 1),
 ('3065', 2),
 ('5116', 2),
 ('8763', 1),
 ('10604', 2),
 ('16', 1),
 ('9055', 3),
 ('10372', 3)]

In [49]:
rdd_sorted_customers = rdd_reduce_customers.sortBy(lambda x : x[-1], False)

In [50]:
rdd_sorted_customers.take(10)

[('1833', 6),
 ('1363', 5),
 ('1687', 5),
 ('5493', 5),
 ('5011', 4),
 ('8974', 4),
 ('2321', 4),
 ('3736', 4),
 ('8368', 4),
 ('2236', 4)]

In [51]:
sc.stop()
spark.stop()

In [52]:
# End of file