In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [2]:
spark

In [3]:
orders_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/orders/part-00000")

In [4]:
orders_rdd.take(15)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE',
 '6,2013-07-25 00:00:00.0,7130,COMPLETE',
 '7,2013-07-25 00:00:00.0,4530,COMPLETE',
 '8,2013-07-25 00:00:00.0,2911,PROCESSING',
 '9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT',
 '10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT',
 '11,2013-07-25 00:00:00.0,918,PAYMENT_REVIEW',
 '12,2013-07-25 00:00:00.0,1837,CLOSED',
 '13,2013-07-25 00:00:00.0,9149,PENDING_PAYMENT',
 '14,2013-07-25 00:00:00.0,9842,PROCESSING',
 '15,2013-07-25 00:00:00.0,2568,COMPLETE']

In [None]:
# 1. count the orders in each order status

In [5]:
mapped_rdd = orders_rdd.map(lambda x : (x.split(",")[3],1)) # Map Transformation 

In [6]:
reduced_rdd = mapped_rdd.reduceByKey(lambda x, y : x+y) #reduceByKey Transformation

In [7]:
reduced_rdd.collect()

[('CLOSED', 7556),
 ('CANCELED', 1428),
 ('COMPLETE', 22899),
 ('PENDING_PAYMENT', 15030),
 ('SUSPECTED_FRAUD', 1558),
 ('PENDING', 7610),
 ('ON_HOLD', 3798),
 ('PROCESSING', 8275),
 ('PAYMENT_REVIEW', 729)]

In [8]:
reduced_sorted_rdd = reduced_rdd.sortBy(lambda x : x[1],False)

In [9]:
reduced_sorted_rdd.collect()

[('COMPLETE', 22899),
 ('PENDING_PAYMENT', 15030),
 ('PROCESSING', 8275),
 ('PENDING', 7610),
 ('CLOSED', 7556),
 ('ON_HOLD', 3798),
 ('SUSPECTED_FRAUD', 1558),
 ('CANCELED', 1428),
 ('PAYMENT_REVIEW', 729)]

In [None]:
# 2. Find the Premium Customers (Top 10 customers who placed the maximum orders) 

In [10]:
orders_rdd.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

In [23]:
mapped_cust = orders_rdd.map(lambda x : (x.split(",")[2],1))

In [24]:
mapped_cust.take(10)

[('11599', 1),
 ('256', 1),
 ('12111', 1),
 ('8827', 1),
 ('11318', 1),
 ('7130', 1),
 ('4530', 1),
 ('2911', 1),
 ('5657', 1),
 ('5648', 1)]

In [25]:
reduced_cust = mapped_cust.reduceByKey(lambda x,y : x+y)

In [26]:
reduced_cust.take(10)

[('3066', 6),
 ('3159', 7),
 ('8135', 11),
 ('2248', 4),
 ('6117', 6),
 ('7733', 7),
 ('6540', 3),
 ('4882', 8),
 ('6060', 7),
 ('10436', 8)]

In [27]:
reduced_sorted_cust = reduced_cust.sortBy(lambda x : x[1], False)

In [28]:
reduced_sorted_cust.take(10) # Top 10 Premium Customers 

[('569', 16),
 ('6316', 16),
 ('12431', 16),
 ('5897', 16),
 ('12284', 15),
 ('5654', 15),
 ('5283', 15),
 ('4320', 15),
 ('221', 15),
 ('5624', 15)]

In [None]:
# 3. Count distinct customers

In [5]:
orders_rdd.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

In [14]:
distinct_cust = orders_rdd.map(lambda x : x.split(",")[2]).distinct()

In [15]:
distinct_cust.count() # count here is an acton (sometimes it can act as a transformation also)

12405

In [16]:
orders_rdd.count()

68883

In [None]:
# Which customer has the maximum number of CLOSED orders

In [17]:
orders_rdd.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

In [37]:
filtered_orders = orders_rdd.filter(lambda x : x.split(",")[3] == 'CLOSED')

In [38]:
filtered_orders.take(15)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '12,2013-07-25 00:00:00.0,1837,CLOSED',
 '18,2013-07-25 00:00:00.0,1205,CLOSED',
 '24,2013-07-25 00:00:00.0,11441,CLOSED',
 '25,2013-07-25 00:00:00.0,9503,CLOSED',
 '37,2013-07-25 00:00:00.0,5863,CLOSED',
 '51,2013-07-25 00:00:00.0,12271,CLOSED',
 '57,2013-07-25 00:00:00.0,7073,CLOSED',
 '61,2013-07-25 00:00:00.0,4791,CLOSED',
 '62,2013-07-25 00:00:00.0,9111,CLOSED',
 '87,2013-07-25 00:00:00.0,3065,CLOSED',
 '90,2013-07-25 00:00:00.0,9131,CLOSED',
 '101,2013-07-25 00:00:00.0,5116,CLOSED',
 '116,2013-07-26 00:00:00.0,8763,CLOSED']

In [39]:
filtered_mapped = filtered_orders.map(lambda x : (x.split(",")[2],1))

In [40]:
filtered_aggregated = filtered_mapped.reduceByKey(lambda x,y : x+y)

In [41]:
filtered_aggregated.take(10)

[('3159', 1),
 ('5834', 2),
 ('10173', 1),
 ('2101', 1),
 ('6000', 1),
 ('1352', 2),
 ('10142', 1),
 ('12210', 1),
 ('6018', 2),
 ('2252', 1)]

In [43]:
filtered_sorted = filtered_aggregated.sortBy(lambda x : x[1], False)

In [45]:
filtered_sorted.take(1)

[('1833', 6)]