In [37]:
# Creating the Spark Session

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("spark_core_APIs_1") \
    .config("spark.dynamicAllocation.enabled","False") \
    .master("local[*]") \
    .getOrCreate()

spark

In [2]:
spark

In [3]:
def square(x):
    return x ** 2

numbers = [1, 2, 3, 4, 5]
squared_numbers = list(map(square, numbers))
print(squared_numbers)

[1, 4, 9, 16, 25]


In [None]:
numbers = [1, 2, 3, 4, 5]
squared_numbers = list(map(lambda x: x *** 2, numbers))
print(squared_numbers)

In [4]:
my_list = [10, 13, 44, 66, 99, 100]

In [6]:
 cubed_numbers= list(map(lambda x: x **3, my_list))

In [7]:
print(cubed_numbers)

[1000, 2197, 85184, 287496, 970299, 1000000]


# Reading a File in RDDs (at the core level)

In [9]:
orders_rdd = spark.sparkContext.textFile("/home/jupyter/module_wise_notebooks/module_4/part-00000")

In [11]:
orders_rdd.take(10)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE',
 '6,2013-07-25 00:00:00.0,7130,COMPLETE',
 '7,2013-07-25 00:00:00.0,4530,COMPLETE',
 '8,2013-07-25 00:00:00.0,2911,PROCESSING',
 '9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT',
 '10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT']

In [None]:
order_id, order_date, customer_id, order_status

## 1. count the orders under each status
## 2. find the premium customers (Top 10 who placed the most number of orders)
## 3. distinct count of customers who placed atleast one order
## 4. which customers has the maximum number of CLOSED orders


In [12]:
orders_rdd.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

# count the orders under each status

In [14]:
mapped_rdd = orders_rdd.map(lambda x: (x.split(",")[3],1))

In [16]:
mapped_rdd.take(20)

[('CLOSED', 1),
 ('PENDING_PAYMENT', 1),
 ('COMPLETE', 1),
 ('CLOSED', 1),
 ('COMPLETE', 1),
 ('COMPLETE', 1),
 ('COMPLETE', 1),
 ('PROCESSING', 1),
 ('PENDING_PAYMENT', 1),
 ('PENDING_PAYMENT', 1),
 ('PAYMENT_REVIEW', 1),
 ('CLOSED', 1),
 ('PENDING_PAYMENT', 1),
 ('PROCESSING', 1),
 ('COMPLETE', 1),
 ('PENDING_PAYMENT', 1),
 ('COMPLETE', 1),
 ('CLOSED', 1),
 ('PENDING_PAYMENT', 1),
 ('PROCESSING', 1)]

In [17]:
reduced_rdd = mapped_rdd.reduceByKey(lambda x,y: x+y)

In [18]:
reduced_rdd.collect()

[('CLOSED', 7556),
 ('CANCELED', 1428),
 ('PENDING_PAYMENT', 15030),
 ('COMPLETE', 22899),
 ('PROCESSING', 8275),
 ('PAYMENT_REVIEW', 729),
 ('PENDING', 7610),
 ('ON_HOLD', 3798),
 ('SUSPECTED_FRAUD', 1558)]

In [19]:
reduced_sorted = reduced_rdd.sortBy(lambda x:x[1],True)
reduced_sorted.collect()

[('PAYMENT_REVIEW', 729),
 ('CANCELED', 1428),
 ('SUSPECTED_FRAUD', 1558),
 ('ON_HOLD', 3798),
 ('CLOSED', 7556),
 ('PENDING', 7610),
 ('PROCESSING', 8275),
 ('PENDING_PAYMENT', 15030),
 ('COMPLETE', 22899)]

In [20]:
reduced_sorted = reduced_rdd.sortBy(lambda x:x[1],False)
reduced_sorted.collect()

[('COMPLETE', 22899),
 ('PENDING_PAYMENT', 15030),
 ('PROCESSING', 8275),
 ('PENDING', 7610),
 ('CLOSED', 7556),
 ('ON_HOLD', 3798),
 ('SUSPECTED_FRAUD', 1558),
 ('CANCELED', 1428),
 ('PAYMENT_REVIEW', 729)]

# 2. find the premium customers (Top 10 who placed the most number of orders)

In [21]:
orders_rdd.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

In [24]:
customers_mapped = orders_rdd.map(lambda x:(x.split(",")[2],1))
customers_mapped.take(10)

[('11599', 1),
 ('256', 1),
 ('12111', 1),
 ('8827', 1),
 ('11318', 1),
 ('7130', 1),
 ('4530', 1),
 ('2911', 1),
 ('5657', 1),
 ('5648', 1)]

In [27]:
customers_aggregated = customers_mapped.reduceByKey(lambda x,y:x+y)
customers_aggregated.take(10)

[('256', 10),
 ('12111', 6),
 ('11318', 6),
 ('7130', 7),
 ('2911', 6),
 ('5657', 12),
 ('9149', 4),
 ('9842', 7),
 ('7276', 5),
 ('9488', 7)]

In [28]:
customers_sorted = customers_aggregated.sortBy(lambda x:x[1],False)

In [29]:
customers_sorted.take(10)

[('5897', 16),
 ('6316', 16),
 ('12431', 16),
 ('569', 16),
 ('4320', 15),
 ('221', 15),
 ('5624', 15),
 ('5283', 15),
 ('12284', 15),
 ('5654', 15)]

In [None]:
select top 10 count(*), order_id from orders group by order_id limit 10

# 3. distinct count of customers who placed atleast one order

In [32]:
distinct_customers = orders_rdd.map(lambda x:(x.split(",")[2])).distinct()

In [35]:
distinct_customers.count()

12405

In [36]:
orders_rdd.distinct().count()

68883

# 4. which customers has the maximum number of CLOSED orders

In [39]:
orders_rdd.take(20)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE',
 '6,2013-07-25 00:00:00.0,7130,COMPLETE',
 '7,2013-07-25 00:00:00.0,4530,COMPLETE',
 '8,2013-07-25 00:00:00.0,2911,PROCESSING',
 '9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT',
 '10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT',
 '11,2013-07-25 00:00:00.0,918,PAYMENT_REVIEW',
 '12,2013-07-25 00:00:00.0,1837,CLOSED',
 '13,2013-07-25 00:00:00.0,9149,PENDING_PAYMENT',
 '14,2013-07-25 00:00:00.0,9842,PROCESSING',
 '15,2013-07-25 00:00:00.0,2568,COMPLETE',
 '16,2013-07-25 00:00:00.0,7276,PENDING_PAYMENT',
 '17,2013-07-25 00:00:00.0,2667,COMPLETE',
 '18,2013-07-25 00:00:00.0,1205,CLOSED',
 '19,2013-07-25 00:00:00.0,9488,PENDING_PAYMENT',
 '20,2013-07-25 00:00:00.0,9198,PROCESSING']

In [40]:
filtered_orders = orders_rdd.filter(lambda x: x.split(",")[3] == "CLOSED")

In [41]:
filtered_orders.take(10)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '12,2013-07-25 00:00:00.0,1837,CLOSED',
 '18,2013-07-25 00:00:00.0,1205,CLOSED',
 '24,2013-07-25 00:00:00.0,11441,CLOSED',
 '25,2013-07-25 00:00:00.0,9503,CLOSED',
 '37,2013-07-25 00:00:00.0,5863,CLOSED',
 '51,2013-07-25 00:00:00.0,12271,CLOSED',
 '57,2013-07-25 00:00:00.0,7073,CLOSED',
 '61,2013-07-25 00:00:00.0,4791,CLOSED']

In [42]:
filtered_mapped = filtered_orders.map(lambda x:(x.split(",")[2],1))

In [43]:
filtered_mapped.take(10)

[('11599', 1),
 ('8827', 1),
 ('1837', 1),
 ('1205', 1),
 ('11441', 1),
 ('9503', 1),
 ('5863', 1),
 ('12271', 1),
 ('7073', 1),
 ('4791', 1)]

In [44]:
filtered_aggregated = filtered_mapped.reduceByKey(lambda x,y:x+y)

In [45]:
filtered_aggregated.take(5)

[('5863', 1), ('12271', 2), ('7073', 1), ('3065', 2), ('5116', 2)]

In [46]:
filtered_sorted = filtered_aggregated.sortBy(lambda x:x[1],False)
filtered_sorted.take(100)

[('1833', 6),
 ('1363', 5),
 ('1687', 5),
 ('5493', 5),
 ('5011', 4),
 ('8974', 4),
 ('2321', 4),
 ('3736', 4),
 ('8368', 4),
 ('2236', 4),
 ('2403', 4),
 ('7879', 4),
 ('1764', 4),
 ('4588', 4),
 ('7948', 4),
 ('7850', 4),
 ('145', 4),
 ('4282', 4),
 ('9213', 4),
 ('3631', 4),
 ('1443', 4),
 ('4573', 4),
 ('4997', 4),
 ('10018', 4),
 ('9740', 4),
 ('4596', 4),
 ('2430', 4),
 ('11833', 4),
 ('9830', 4),
 ('9804', 4),
 ('8630', 4),
 ('1345', 4),
 ('437', 4),
 ('1521', 4),
 ('569', 4),
 ('9260', 4),
 ('2774', 4),
 ('12431', 4),
 ('1860', 4),
 ('5319', 4),
 ('10263', 4),
 ('5582', 4),
 ('2768', 4),
 ('10111', 4),
 ('9055', 3),
 ('10372', 3),
 ('8309', 3),
 ('11326', 3),
 ('8466', 3),
 ('10255', 3),
 ('6413', 3),
 ('4331', 3),
 ('6339', 3),
 ('4435', 3),
 ('7348', 3),
 ('11476', 3),
 ('10265', 3),
 ('1655', 3),
 ('2842', 3),
 ('4552', 3),
 ('3167', 3),
 ('11175', 3),
 ('5532', 3),
 ('645', 3),
 ('9458', 3),
 ('6512', 3),
 ('2467', 3),
 ('12024', 3),
 ('2039', 3),
 ('7029', 3),
 ('7919', 3)

In [47]:
filtered_sorted.take(10)

[('1833', 6),
 ('1363', 5),
 ('1687', 5),
 ('5493', 5),
 ('5011', 4),
 ('8974', 4),
 ('2321', 4),
 ('3736', 4),
 ('8368', 4),
 ('2236', 4)]