In [1]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("MyApp").setMaster("local[*]")
sc = SparkContext(conf=conf)

In [2]:
rdd = sc.parallelize([10,20,30,40])

In [4]:
type(rdd)
print(rdd.collect())

[10, 20, 30, 40]


In [5]:
sc.stop()

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('demo spark')\
        .master("local[1]")\
        .getOrCreate()

In [7]:
rdd =spark.sparkContext.parallelize([10,20,30,40])

In [8]:
rdd.collect()

[10, 20, 30, 40]

In [9]:
rdd.take(2)

[10, 20]

In [10]:
# Creating rdd using local file
rdd =spark.sparkContext.textFile('c:\data\employees.txt')
rdd.collect()

['101,John,50000,HR',
 '102,Asha,60000,Finance',
 '103,Ravi,55000,IT',
 '104,Divya,70000,Marketing',
 '105,Krish,45000,Admin']

In [None]:
#creating rdd using whole text file
    # Each entire file is one record
    # You want filename + content pairs

In [15]:
rdd = spark.sparkContext.wholeTextFiles("C:/data/artical/")
rdd.collect()

[('file:/C:/data/artical/artical1.txt',
  'Apache Spark is a fast and general-purpose cluster computing system.\r\nIt provides high-level APIs in Java, Scala, Python, and R.'),
 ('file:/C:/data/artical/artical2.txt',
  'PySpark makes data processing easier by integrating Python with Spark.\r\nRDD, DataFrames, and SQL APIs help to process data efficiently.'),
 ('file:/C:/data/artical/artical3.txt',
  'PySpark 100 time faster than Hadoop MapReduce ')]

In [16]:
for name,content in rdd.collect():
    print(name)
    print(content)
    print()


file:/C:/data/artical/artical1.txt
Apache Spark is a fast and general-purpose cluster computing system.
It provides high-level APIs in Java, Scala, Python, and R.

file:/C:/data/artical/artical2.txt
PySpark makes data processing easier by integrating Python with Spark.
RDD, DataFrames, and SQL APIs help to process data efficiently.

file:/C:/data/artical/artical3.txt
PySpark 100 time faster than Hadoop MapReduce 



In [18]:
# Creating rdd from existint rdd
rdd = spark.sparkContext.range(10)
rdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [19]:
rdd1 = rdd

In [20]:
type(rdd1)

pyspark.core.rdd.PipelinedRDD

In [21]:
# creating empty rdd
rdd = spark.sparkContext.emptyRDD()
rdd.collect()

[]

In [22]:
# creating rdd with dataframe
df = spark.read.csv('c:/data/employees.txt')

In [23]:
type(df)

pyspark.sql.classic.dataframe.DataFrame

In [None]:
type(df)

In [25]:
rdd = df.rdd
rdd.collect()

[Row(_c0='101', _c1='John', _c2='50000', _c3='HR'),
 Row(_c0='102', _c1='Asha', _c2='60000', _c3='Finance'),
 Row(_c0='103', _c1='Ravi', _c2='55000', _c3='IT'),
 Row(_c0='104', _c1='Divya', _c2='70000', _c3='Marketing'),
 Row(_c0='105', _c1='Krish', _c2='45000', _c3='Admin')]

In [26]:
rdd.getNumPartitions()

1

In [27]:
rdd = spark.sparkContext.textFile('c:/data/employees.txt',4)

In [28]:
rdd.getNumPartitions()

4

In [29]:
rdd.glom().collect()

[['101,John,50000,HR', '102,Asha,60000,Finance'],
 ['103,Ravi,55000,IT'],
 ['104,Divya,70000,Marketing'],
 ['105,Krish,45000,Admin']]

In [30]:
ord =spark.sparkContext.textFile('c:/data/Orders')
ordItems=spark.sparkContext.textFile('c:/data/OrderItems')


In [31]:
ord.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

In [51]:
def toLower(str):
    return str.lower()
ordMap = ord.map(lambda x : x.split(',')[3].lower())
ordMap.take(5)

['closed', 'pending_payment', 'complete', 'closed', 'complete']

In [38]:
ordMap = ord.map(lambda x: int(x.split(',')[0]))
ordMap.take(10)

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [42]:
ordMap = ord.map(lambda x :  x.split(',')[0]+'#'+ x.split(',')[3])
for x in ordMap.take(10):
    print(x)
                 

1#CLOSED
2#PENDING_PAYMENT
3#COMPLETE
4#CLOSED
5#COMPLETE
6#COMPLETE
7#COMPLETE
8#PROCESSING
9#PENDING_PAYMENT
10#PENDING_PAYMENT


In [45]:
ordMap = ord.map(lambda x : x.split(',')[1].split(' ')[0].replace('-','/'))
ordMap.take(5)

['2013/07/25', '2013/07/25', '2013/07/25', '2013/07/25', '2013/07/25']

In [47]:
ordMap = ord.map(lambda x : (x.split(',')[0], x))
ordMap.take(4)

[('1', '1,2013-07-25 00:00:00.0,11599,CLOSED'),
 ('2', '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT'),
 ('3', '3,2013-07-25 00:00:00.0,12111,COMPLETE'),
 ('4', '4,2013-07-25 00:00:00.0,8827,CLOSED')]

In [32]:
ordItems.take(5)

['1,1,957,1,299.98,299.98',
 '2,2,1073,1,199.99,199.99',
 '3,2,502,5,250.0,50.0',
 '4,2,403,1,129.99,129.99',
 '5,4,897,2,49.98,24.99']

In [48]:
ordItemsMap = ordItems.map(lambda x : ( x.split(',')[0] , float(x.split(',')[4])))
ordItemsMap.take(5)
                           

[('1', 299.98), ('2', 199.99), ('3', 250.0), ('4', 129.99), ('5', 49.98)]

In [None]:
# The map() transformation in PySpark is used to apply a function to each element of an RDD and return a new RDD.

In [52]:
rdd = spark.sparkContext.parallelize([10,20,30,40])
rddMap = rdd.map(lambda x : x *x )
rddMap.collect()               
                                    

[100, 400, 900, 1600]

In [53]:
rdd = spark.sparkContext.parallelize(["cat", "tiger", "elephant"])
result = rdd.map(lambda x: len(x))
result.collect()

[3, 5, 8]

In [54]:
rdd = spark.sparkContext.parallelize([('ram',5000),('sham',4000),('prem',6000)])
rddMap = rdd.map(lambda x : ( x [0],  x[1]+1000))
rddMap.collect()

[('ram', 6000), ('sham', 5000), ('prem', 7000)]

In [None]:
# extract orderId form ord rdd
ordMap=ord.map(lambda x : x.split(',')[0])
for i in ordMap.take(5):
    print(i)

In [None]:
# extract orderId & Order status form ord rdd
ordMap = ord.map(lambda x : x.split(',')[0] +' ' + x.split(',')[3].lower())
for x in ordMap.take(5):
    print(x)


In [None]:
# extract order_status form ord rdd and convert into lower case
ordMap = ord.map(lambda x : x.split(',')[3].lower())
for x in ordMap.take(5):
    print(x)


In [None]:
#extract orderDate and conver orderdate into yyyy/mm/dd format
ordMap = ord.map(lambda x : x.split(',')[1].split(' ')[0].replace('-','/'))
ordMap.take(5)

In [None]:
# Create key-value pairs with key as Order id and values as whole records
ordMap = ord.map(lambda x :  x.split(',')[0]+':'+ x)
for x in ordMap.take(5):
    print(x)


In [None]:
# Project all the Order_item_ids and their subtotal.
ordItemsMap = ordItems.map(lambda x : (x.split(',')[0],x.split(',')[4]))
ordItemsMap.take(5)


In [None]:
ordItemsMap = ordItems.map(lambda x : x.split(',')[0])
ordItemsMap.take(5)

In [None]:
# Applied user defined function to convert status into lowercase.
def lowerCase(str):
    return str.lower()


ordMap = ord.map(lambda x : lowerCase(x.split(',')[3]) )
ordMap.take(5)


In [None]:
# flatmap
# Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
# Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). Number of records in input is less than or equal to output.


In [55]:
ordMap = ord.map(lambda x : x.split(','))
ordMap.take(10)

[['1', '2013-07-25 00:00:00.0', '11599', 'CLOSED'],
 ['2', '2013-07-25 00:00:00.0', '256', 'PENDING_PAYMENT'],
 ['3', '2013-07-25 00:00:00.0', '12111', 'COMPLETE'],
 ['4', '2013-07-25 00:00:00.0', '8827', 'CLOSED'],
 ['5', '2013-07-25 00:00:00.0', '11318', 'COMPLETE'],
 ['6', '2013-07-25 00:00:00.0', '7130', 'COMPLETE'],
 ['7', '2013-07-25 00:00:00.0', '4530', 'COMPLETE'],
 ['8', '2013-07-25 00:00:00.0', '2911', 'PROCESSING'],
 ['9', '2013-07-25 00:00:00.0', '5657', 'PENDING_PAYMENT'],
 ['10', '2013-07-25 00:00:00.0', '5648', 'PENDING_PAYMENT']]

In [58]:
ordFlatMap = ord.flatMap(lambda x : x.split(',')).map(lambda x :(x,1)).reduceByKey(lambda x, y : x+y)
ordFlatMap.take(20)

[('1', 2),
 ('2013-07-25 00:00:00.0', 143),
 ('11599', 6),
 ('CLOSED', 7556),
 ('2', 5),
 ('256', 11),
 ('PENDING_PAYMENT', 15030),
 ('3', 8),
 ('12111', 7),
 ('COMPLETE', 22899),
 ('4', 7),
 ('8827', 7),
 ('5', 5),
 ('11318', 7),
 ('6', 5),
 ('7130', 8),
 ('7', 9),
 ('4530', 11),
 ('8', 9),
 ('2911', 7)]

In [4]:
wordCount = ord.flatMap(lambda x : x.split(',')).map(lambda x : (x,1)).reduceByKey(lambda x, y: x +y)
wordCount.take(10)

[('1', 2),
 ('2013-07-25 00:00:00.0', 143),
 ('11599', 6),
 ('CLOSED', 7556),
 ('2', 5),
 ('256', 11),
 ('PENDING_PAYMENT', 15030),
 ('3', 8),
 ('12111', 7),
 ('COMPLETE', 22899)]

In [4]:
rdd = spark.sparkContext.parallelize([1,2,3,4,5,6])
rdd.filter( lambda x : x % 2==0).collect()

[2, 4, 6]

In [6]:
data = ["apple", "banana", "apricot", "orange"]
rdd =spark.sparkContext.parallelize(data)

result = rdd.filter(lambda x: "ap" in x)

print(result.collect())


['apple', 'apricot']


In [8]:
data = [
    {"id": 1, "salary": 50000},
    {"id": 2, "salary": 70000},
    {"id": 3, "salary": 30000}
]

rdd = spark.sparkContext.parallelize(data)

high_salary = rdd.filter(lambda row: row["salary"] > 50000)

print(high_salary.collect())


[{'id': 2, 'salary': 70000}]


In [65]:
ordFil = ord.filter( lambda x : int(x.split(',')[0]) <11 and x.split(',')[3] in (['CLOSED', 'COMPLETE']))
ordFil.collect()

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE',
 '6,2013-07-25 00:00:00.0,7130,COMPLETE',
 '7,2013-07-25 00:00:00.0,4530,COMPLETE']

In [71]:
# list the orders for feb 2014 
ordFil = ord.filter(lambda x : x.split(',')[1].split('-')[0] =='2014' and x.split(',')[1].split('-')[1] =='02' )
ordFil.take(20)                    
                    

['30776,2014-02-01 00:00:00.0,11805,COMPLETE',
 '30777,2014-02-01 00:00:00.0,5114,ON_HOLD',
 '30778,2014-02-01 00:00:00.0,5747,COMPLETE',
 '30779,2014-02-01 00:00:00.0,4545,COMPLETE',
 '30780,2014-02-01 00:00:00.0,6560,PROCESSING',
 '30781,2014-02-01 00:00:00.0,9430,PROCESSING',
 '30782,2014-02-01 00:00:00.0,10792,PENDING_PAYMENT',
 '30783,2014-02-01 00:00:00.0,1118,PENDING',
 '30784,2014-02-01 00:00:00.0,730,COMPLETE',
 '30785,2014-02-01 00:00:00.0,1887,COMPLETE',
 '30786,2014-02-01 00:00:00.0,5591,PENDING',
 '30787,2014-02-01 00:00:00.0,11874,COMPLETE',
 '30788,2014-02-01 00:00:00.0,1091,COMPLETE',
 '30789,2014-02-01 00:00:00.0,2313,COMPLETE',
 '30790,2014-02-01 00:00:00.0,9037,PENDING_PAYMENT',
 '30791,2014-02-01 00:00:00.0,1574,CLOSED',
 '30792,2014-02-01 00:00:00.0,1484,PENDING_PAYMENT',
 '30793,2014-02-01 00:00:00.0,7353,PENDING',
 '30794,2014-02-01 00:00:00.0,6196,CLOSED',
 '30795,2014-02-01 00:00:00.0,9871,COMPLETE']

In [74]:
nums = spark.sparkContext.parallelize(range(1, 21))

result = nums.filter(lambda x: x%2==0)

print(result.collect())


[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]


In [12]:
ord.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

In [14]:
ord.filter(lambda x : int(x.split(',')[0]) <=10).collect()

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE',
 '6,2013-07-25 00:00:00.0,7130,COMPLETE',
 '7,2013-07-25 00:00:00.0,4530,COMPLETE',
 '8,2013-07-25 00:00:00.0,2911,PROCESSING',
 '9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT',
 '10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT']

In [17]:
ord.filter(lambda x : int(x.split(',')[0]) >=10 and int(x.split(',')[0]) <=15 ).collect()

['10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT',
 '11,2013-07-25 00:00:00.0,918,PAYMENT_REVIEW',
 '12,2013-07-25 00:00:00.0,1837,CLOSED',
 '13,2013-07-25 00:00:00.0,9149,PENDING_PAYMENT',
 '14,2013-07-25 00:00:00.0,9842,PROCESSING',
 '15,2013-07-25 00:00:00.0,2568,COMPLETE']

In [18]:
ord.filter( lambda x : x.split(',')[3]=='CLOSED').take(10)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '12,2013-07-25 00:00:00.0,1837,CLOSED',
 '18,2013-07-25 00:00:00.0,1205,CLOSED',
 '24,2013-07-25 00:00:00.0,11441,CLOSED',
 '25,2013-07-25 00:00:00.0,9503,CLOSED',
 '37,2013-07-25 00:00:00.0,5863,CLOSED',
 '51,2013-07-25 00:00:00.0,12271,CLOSED',
 '57,2013-07-25 00:00:00.0,7073,CLOSED',
 '61,2013-07-25 00:00:00.0,4791,CLOSED']

In [20]:
# Print all the orders which are closed or Complete and ordered in the year 2013.
ord.filter(lambda x : x.split(',')[3] in ['CLOSED', 'COMPLETE'] and x.split(',')[1].split('-')[0]).take(10)


['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE',
 '6,2013-07-25 00:00:00.0,7130,COMPLETE',
 '7,2013-07-25 00:00:00.0,4530,COMPLETE',
 '12,2013-07-25 00:00:00.0,1837,CLOSED',
 '15,2013-07-25 00:00:00.0,2568,COMPLETE',
 '17,2013-07-25 00:00:00.0,2667,COMPLETE',
 '18,2013-07-25 00:00:00.0,1205,CLOSED']

In [None]:
# mapValues
# Only applicable to pair RDDs RDD((A,B)).
# Do not change the key. Apply the function ‘f’ to all values of the same key

In [2]:
rdd = spark.sparkContext.parallelize((('A',(10,20,30,40)),('B',(4,10,20)),('A',(10,20,30,40,60))) )
rdd.take(4)

[('A', (10, 20, 30, 40)), ('B', (4, 10, 20)), ('A', (10, 20, 30, 40, 60))]

In [3]:
def f(x):
    return len(x)

In [4]:
rddMapvalue =rdd.mapValues(f)
rddMapvalue.collect()

[('A', 4), ('B', 3), ('A', 5)]

In [11]:
emp = spark.sparkContext.parallelize([(100,'babjee'),(100,'ram'),(101,'sham'),(102,'praveen')])
dept =spark.sparkContext.parallelize([(100,'accts'),(101,'admin'),(103,'marketing'),(104,'hr')])

In [12]:
result= dept.join(emp)
result.collect()

[(100, ('accts', 'babjee')), (100, ('accts', 'ram')), (101, ('admin', 'sham'))]

In [13]:
result =dept.leftOuterJoin(emp)
result.collect()

[(100, ('accts', 'babjee')),
 (100, ('accts', 'ram')),
 (104, ('hr', None)),
 (101, ('admin', 'sham')),
 (103, ('marketing', None))]

In [14]:
result = dept.rightOuterJoin(emp)
result.collect()

[(100, ('accts', 'babjee')),
 (100, ('accts', 'ram')),
 (102, (None, 'praveen')),
 (101, ('admin', 'sham'))]

In [15]:
result = dept.fullOuterJoin(emp)
result.collect()

[(100, ('accts', 'babjee')),
 (100, ('accts', 'ram')),
 (104, ('hr', None)),
 (102, (None, 'praveen')),
 (101, ('admin', 'sham')),
 (103, ('marketing', None))]

In [17]:
ord = spark.sparkContext.textFile('c:/data/Orders')
ordItems = spark.sparkContext.textFile('c:/data/OrderItems')

In [19]:
ord.first()

'1,2013-07-25 00:00:00.0,11599,CLOSED'

In [20]:
ordItems.take(5)

['1,1,957,1,299.98,299.98',
 '2,2,1073,1,199.99,199.99',
 '3,2,502,5,250.0,50.0',
 '4,2,403,1,129.99,129.99',
 '5,4,897,2,49.98,24.99']

In [None]:
# : Find the subtotal for each CUSTOMER_ID.


In [21]:
ordMap=ord.map(lambda x : (x.split(',')[0],x.split(',')[2]))


In [22]:
ordItemsMap=ordItems.map(lambda x : (x.split(',')[1],x.split(',')[4]))


In [23]:
findSubtotalForCust = ordMap.join(ordItemsMap)


In [24]:
findSubtotalForCust.take(5)

[('4', ('8827', '49.98')),
 ('4', ('8827', '299.95')),
 ('4', ('8827', '150.0')),
 ('4', ('8827', '199.92')),
 ('5', ('11318', '299.98'))]

In [25]:
for x in findSubtotalForCust.map(lambda x : str(x[1] [0]) +',' + str(x[1] [1]) ).take(5):
    print(x)

8827,49.98
8827,299.95
8827,150.0
8827,199.92
11318,299.98


In [27]:
rdd1 = spark.sparkContext.parallelize([
    ("A", 10),
    ("B", 20),
    ("A", 30)
])

rdd2 = spark.sparkContext.parallelize([
    ("A", 5),
    ("B",10)
])



In [30]:
result = rdd1.cogroup(rdd2)
clean = result.mapValues(lambda x: (list(x[0]), list(x[1])))
print(clean.collect())


[('A', ([10, 30], [5])), ('B', ([20], [10]))]


In [31]:
# cartision 
rdd = spark.sparkContext.parallelize((1,3,2))
rdd1 = spark.sparkContext.parallelize((5,6,7))

sorted(rdd.cartesian(rdd1).collect())




[(1, 5), (1, 6), (1, 7), (2, 5), (2, 6), (2, 7), (3, 5), (3, 6), (3, 7)]

In [23]:
ord = spark.sparkContext.textFile('c:/data/orders')
ordItems = spark.sparkContext.textFile('c:/data/OrderItems')



In [32]:
### Count the number of orders which are closed.
ord.filter(lambda x : x.split(',')[3]=='CLOSED').count()

7556

In [36]:
ordItems.first()

'1,1,957,1,299.98,299.98'

In [39]:
#Find the total quantity sold for Order ID 1-10.
ordItems.filter( lambda x : int(x.split(',')[1] ) <11 ).map(lambda x : int(x.split(',')[3]) ).reduce( lambda x, y: x+y)

62

In [41]:
from operator import add

In [42]:
ordItems.filter( lambda x : int(x.split(',')[1] ) <11 ).map(lambda x : int(x.split(',')[3]) ).reduce(add)

62

In [36]:
ordItems.filter(lambda x : int(x.split(',')[1] ) <11).\
    map(lambda x : int(x.split(',')[3])).reduce( lambda x, y: x+y)

62

In [38]:
from operator import add

In [40]:
ordItems.filter(lambda x : int(x.split(',')[1] ) <11).\
    map(lambda x : int(x.split(',')[3])).reduce(add)

62

In [45]:
ordItems.filter(lambda x : int(x.split(',')[1] ) ==10).collect()

['24,10,1073,1,199.99,199.99',
 '25,10,1014,2,99.96,49.98',
 '26,10,403,1,129.99,129.99',
 '27,10,917,1,21.99,21.99',
 '28,10,1073,1,199.99,199.99']

In [49]:
### For a given order 10 find the maximum subtotal out of all orders.
ordItems.filter(lambda x : int(x.split(',')[1] ) ==10).\
    map(lambda x : float(x.split(',')[4])).reduce(lambda x, y : x if x < y else y)

21.99

In [48]:
ordItems.filter(lambda x : int(x.split(',')[1] ) ==10).\
    map(lambda x : float(x.split(',')[4])).reduce(min)

21.99

In [59]:
# groupByKey
# For each product, find its aggregated revenue.

ordGrp = ordItems.map(lambda x : (int(x.split(',')[2]),float(x.split(',')[4]))).groupByKey()
result = ordGrp.mapValues(sum).collect()
for x in result:
    print(x)

(957, 4118425.4199997648)
(1073, 3099845.0000010305)
(502, 3147800.0)
(403, 2891757.540001228)
(897, 20566.769999999986)
(365, 4421143.020001181)
(1014, 2888993.939999287)
(926, 14870.69999999995)
(191, 3667633.20000043)
(917, 20450.69999999998)
(627, 1269082.6499998174)
(134, 20025.0)
(276, 29398.810000000012)
(1004, 6929653.500002877)
(828, 28982.940000000017)
(810, 16031.97999999994)
(93, 20866.64999999999)
(37, 27327.190000000017)
(906, 22865.849999999995)
(825, 25751.950000000015)
(924, 14151.149999999956)
(886, 21766.289999999983)
(821, 44243.49000000003)
(775, 8871.119999999972)
(572, 35191.20000000005)
(835, 29686.720000000027)
(778, 21116.549999999985)
(565, 67830.0)
(278, 36576.87000000003)
(642, 27840.0)
(135, 19426.0)
(804, 18810.589999999964)
(564, 27210.0)
(792, 12951.359999999946)
(797, 15993.109999999939)
(235, 29601.540000000005)
(905, 22366.04999999999)
(771, 34671.33000000009)
(172, 27210.0)
(822, 38392.00000000001)
(977, 29930.020000000037)
(823, 47206.92000000003)


In [55]:
# reduceByKey
from operator import add
rdd = spark.sparkContext.parallelize((("a", 1), ("b", 1), ("a", 1)))
sorted(rdd.reduceByKey(add).collect())


[('a', 2), ('b', 1)]

In [60]:
### Find the total revenue sold for each order. 
ordItems.map(lambda x : (int(x.split(',')[1]),float(x.split(',')[4]))).reduceByKey(add).collect()


[(1, 299.98),
 (2, 579.98),
 (4, 699.85),
 (5, 1129.8600000000001),
 (7, 579.9200000000001),
 (8, 729.8399999999999),
 (9, 599.96),
 (10, 651.9200000000001),
 (11, 919.79),
 (12, 1299.8700000000001),
 (13, 127.96),
 (14, 549.94),
 (15, 925.9100000000001),
 (16, 419.93),
 (17, 694.84),
 (18, 449.96000000000004),
 (19, 699.96),
 (20, 879.8599999999999),
 (21, 372.91),
 (23, 299.98),
 (24, 829.97),
 (25, 399.98),
 (27, 749.97),
 (28, 1159.9),
 (29, 1109.85),
 (30, 100.0),
 (31, 499.95),
 (33, 659.89),
 (34, 299.98),
 (35, 129.99),
 (36, 799.96),
 (37, 159.95),
 (38, 359.96000000000004),
 (39, 199.99),
 (41, 327.88),
 (42, 739.9200000000001),
 (43, 529.96),
 (44, 399.98),
 (45, 499.84999999999997),
 (46, 229.95),
 (48, 99.96),
 (49, 549.95),
 (50, 429.97),
 (51, 449.98),
 (52, 399.84000000000003),
 (56, 699.89),
 (57, 637.9),
 (58, 799.94),
 (59, 619.95),
 (61, 639.92),
 (62, 1149.94),
 (63, 899.9200000000001),
 (64, 659.97),
 (65, 299.98),
 (66, 679.94),
 (67, 150.0),
 (68, 299.98),
 (69,

In [56]:
ordItems.map(lambda x : (int(x.split(',')[1]),float(x.split(',')[4]))).reduceByKey(add).collect()


[(1, 299.98),
 (2, 579.98),
 (4, 699.85),
 (5, 1129.8600000000001),
 (7, 579.9200000000001),
 (8, 729.8399999999999),
 (9, 599.96),
 (10, 651.9200000000001),
 (11, 919.79),
 (12, 1299.8700000000001),
 (13, 127.96),
 (14, 549.94),
 (15, 925.9100000000001),
 (16, 419.93),
 (17, 694.84),
 (18, 449.96000000000004),
 (19, 699.96),
 (20, 879.8599999999999),
 (21, 372.91),
 (23, 299.98),
 (24, 829.97),
 (25, 399.98),
 (27, 749.97),
 (28, 1159.9),
 (29, 1109.85),
 (30, 100.0),
 (31, 499.95),
 (33, 659.89),
 (34, 299.98),
 (35, 129.99),
 (36, 799.96),
 (37, 159.95),
 (38, 359.96000000000004),
 (39, 199.99),
 (41, 327.88),
 (42, 739.9200000000001),
 (43, 529.96),
 (44, 399.98),
 (45, 499.84999999999997),
 (46, 229.95),
 (48, 99.96),
 (49, 549.95),
 (50, 429.97),
 (51, 449.98),
 (52, 399.84000000000003),
 (56, 699.89),
 (57, 637.9),
 (58, 799.94),
 (59, 619.95),
 (61, 639.92),
 (62, 1149.94),
 (63, 899.9200000000001),
 (64, 659.97),
 (65, 299.98),
 (66, 679.94),
 (67, 150.0),
 (68, 299.98),
 (69,

In [57]:
Find the maximum revenue for each order.

ordItems.map(lambda x : (int(x.split(',') [1]),x)).reduceByKey(lambda a,b : a  if (float(a.split(',')[4]) > float(b.split(',')[4])) else b).collect()


[(1, '1,1,957,1,299.98,299.98'),
 (2, '3,2,502,5,250.0,50.0'),
 (4, '6,4,365,5,299.95,59.99'),
 (5, '12,5,957,1,299.98,299.98'),
 (7, '15,7,957,1,299.98,299.98'),
 (8, '18,8,365,5,299.95,59.99'),
 (9, '23,9,1073,1,199.99,199.99'),
 (10, '28,10,1073,1,199.99,199.99'),
 (11, '32,11,191,4,399.96,99.99'),
 (12, '37,12,191,5,499.95,99.99'),
 (13, '39,13,276,4,127.96,31.99'),
 (14, '40,14,1004,1,399.98,399.98'),
 (15, '47,15,1004,1,399.98,399.98'),
 (16, '49,16,365,5,299.95,59.99'),
 (17, '54,17,365,4,239.96,59.99'),
 (18, '55,18,1073,1,199.99,199.99'),
 (19, '58,19,1004,1,399.98,399.98'),
 (20, '63,20,365,5,299.95,59.99'),
 (21, '65,21,276,4,127.96,31.99'),
 (23, '68,23,957,1,299.98,299.98'),
 (24, '71,24,502,5,250.0,50.0'),
 (25, '74,25,1004,1,399.98,399.98'),
 (27, '77,27,1004,1,399.98,399.98'),
 (28, '78,28,191,4,399.96,99.99'),
 (29, '87,29,1004,1,399.98,399.98'),
 (30, '88,30,502,2,100.0,50.0'),
 (31, '89,31,191,5,499.95,99.99'),
 (33, '91,33,1073,1,199.99,199.99'),
 (34, '94,34,957,1,

In [58]:
ordItems.map(lambda x : (int(x.split(',') [1]),x)).reduceByKey(max).collect()


[(1, '1,1,957,1,299.98,299.98'),
 (2, '4,2,403,1,129.99,129.99'),
 (4, '8,4,1014,4,199.92,49.98'),
 (5, '9,5,957,1,299.98,299.98'),
 (7, '16,7,926,5,79.95,15.99'),
 (8, '20,8,502,1,50.0,50.0'),
 (9, '23,9,1073,1,199.99,199.99'),
 (10, '28,10,1073,1,199.99,199.99'),
 (11, '33,11,1014,5,249.9,49.98'),
 (12, '38,12,502,5,250.0,50.0'),
 (13, '39,13,276,4,127.96,31.99'),
 (14, '42,14,502,1,50.0,50.0'),
 (15, '47,15,1004,1,399.98,399.98'),
 (16, '49,16,365,5,299.95,59.99'),
 (17, '54,17,365,4,239.96,59.99'),
 (18, '57,18,403,1,129.99,129.99'),
 (19, '59,19,957,1,299.98,299.98'),
 (20, '63,20,365,5,299.95,59.99'),
 (21, '67,21,502,2,100.0,50.0'),
 (23, '68,23,957,1,299.98,299.98'),
 (24, '73,24,1073,1,199.99,199.99'),
 (25, '74,25,1004,1,399.98,399.98'),
 (27, '77,27,1004,1,399.98,399.98'),
 (28, '82,28,365,1,59.99,59.99'),
 (29, '87,29,1004,1,399.98,399.98'),
 (30, '88,30,502,2,100.0,50.0'),
 (31, '89,31,191,5,499.95,99.99'),
 (33, '93,33,1014,4,199.92,49.98'),
 (34, '94,34,957,1,299.98,299.

In [62]:
# countByKey  
ordMap = ord.map(lambda x : (x.split(',')[3],1))

countStatus =ordMap.countByKey()
for x,y in countStatus.items():
    print(x, y)

[('CLOSED', 1),
 ('PENDING_PAYMENT', 1),
 ('COMPLETE', 1),
 ('CLOSED', 1),
 ('COMPLETE', 1),
 ('COMPLETE', 1),
 ('COMPLETE', 1),
 ('PROCESSING', 1),
 ('PENDING_PAYMENT', 1),
 ('PENDING_PAYMENT', 1)]

In [64]:
# Sort orders using customer id.
ordPair =  ord.map(lambda x : (int(x.split(',')[2]),x))
ordSort = ordPair.sortByKey(ascending=False)
for i in ordSort.take(10) : print(i)



(12435, '41643,2014-04-08 00:00:00.0,12435,PENDING')
(12435, '61629,2013-12-21 00:00:00.0,12435,CANCELED')
(12434, '1868,2013-08-03 00:00:00.0,12434,CLOSED')
(12434, '4799,2013-08-23 00:00:00.0,12434,PENDING_PAYMENT')
(12434, '5303,2013-08-26 00:00:00.0,12434,PENDING')
(12434, '6160,2013-09-02 00:00:00.0,12434,COMPLETE')
(12434, '13544,2013-10-16 00:00:00.0,12434,PENDING')
(12434, '42915,2014-04-16 00:00:00.0,12434,COMPLETE')
(12434, '51800,2014-06-14 00:00:00.0,12434,ON_HOLD')
(12434, '61777,2013-12-26 00:00:00.0,12434,COMPLETE')


In [65]:
# Sort orders using customer and status.
ordPair =  ord.map(lambda x : ((int(x.split(',')[2]), x.split(',')[3]),x))
ordSort = ordPair.sortByKey(ascending=False)
for i in ordSort.take(10) : print(i)



((12435, 'PENDING'), '41643,2014-04-08 00:00:00.0,12435,PENDING')
((12435, 'CANCELED'), '61629,2013-12-21 00:00:00.0,12435,CANCELED')
((12434, 'PENDING_PAYMENT'), '4799,2013-08-23 00:00:00.0,12434,PENDING_PAYMENT')
((12434, 'PENDING'), '5303,2013-08-26 00:00:00.0,12434,PENDING')
((12434, 'PENDING'), '13544,2013-10-16 00:00:00.0,12434,PENDING')
((12434, 'ON_HOLD'), '51800,2014-06-14 00:00:00.0,12434,ON_HOLD')
((12434, 'COMPLETE'), '6160,2013-09-02 00:00:00.0,12434,COMPLETE')
((12434, 'COMPLETE'), '42915,2014-04-16 00:00:00.0,12434,COMPLETE')
((12434, 'COMPLETE'), '61777,2013-12-26 00:00:00.0,12434,COMPLETE')
((12434, 'CLOSED'), '1868,2013-08-03 00:00:00.0,12434,CLOSED')
