## Section 4: Apache Core Spark 1.6 - Transform, Stage and Store

In [4]:
#Reading file from HDFS
! hdfs dfs -ls /user/pi/retail_db

2020-05-17 17:24:33,240 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 9 items
drwxr-xr-x   - pi supergroup          0 2020-05-11 12:06 /user/pi/retail_db/.git
-rw-r--r--   2 pi supergroup        826 2020-05-11 12:05 /user/pi/retail_db/README.md
drwxr-xr-x   - pi supergroup          0 2020-05-11 12:06 /user/pi/retail_db/categories
-rw-r--r--   2 pi supergroup   10303495 2020-05-11 12:06 /user/pi/retail_db/create_db.sql
drwxr-xr-x   - pi supergroup          0 2020-05-11 12:06 /user/pi/retail_db/customers
drwxr-xr-x   - pi supergroup          0 2020-05-11 12:06 /user/pi/retail_db/departments
drwxr-xr-x   - pi supergroup          0 2020-05-11 12:05 /user/pi/retail_db/order_items
drwxr-xr-x   - pi supergroup          0 2020-05-11 12:05 /user/pi/retail_db/orders
drwxr-xr-x   - pi supergroup          0 2020-05-11 12:06 /user/pi/retail_db/products


In [5]:
order_items = sc.textFile("/user/pi/retail_db/order_items")
for order in order_items.take(10):
    print(order)

1,1,957,1,299.98,299.98
2,2,1073,1,199.99,199.99
3,2,502,5,250.0,50.0
4,2,403,1,129.99,129.99
5,4,897,2,49.98,24.99
6,4,365,5,299.95,59.99
7,4,502,3,150.0,50.0
8,4,1014,4,199.92,49.98
9,5,957,1,299.98,299.98
10,5,365,5,299.95,59.99


In [14]:
#Sample code to demostration
#Order Revenue
order_items = sc.textFile('/user/pi/retail_db/order_items')
order_items_PairRDD = order_items.map(lambda order:(order.split(',')[1],float(order.split(',')[4])))
revenue = order_items_PairRDD.reduceByKey(lambda x,y:x+y)
for order,rev in revenue.take(10):
    print("{}-{:.2f}".format(order,rev))

1-299.98
4-699.85
8-729.84
9-599.96
10-651.92
12-1299.87
14-549.94
16-419.93
17-694.84
19-699.96


In [16]:
#to see DAG information
order_items_PairRDD .toDebugString()

b'(2) PythonRDD[48] at RDD at PythonRDD.scala:53 []\n |  /user/pi/retail_db/order_items MapPartitionsRDD[42] at textFile at NativeMethodAccessorImpl.java:0 []\n |  /user/pi/retail_db/order_items HadoopRDD[41] at textFile at NativeMethodAccessorImpl.java:0 []'

In [17]:
#Previewing the data using Actions
order_items_PairRDD.take(10) # display first 10 rows
order_items_PairRDD.collect()  #convert RDD to python list
order_items_PairRDD.count()  #to count number of records
order_items_PairRDD.first() # to display firt row

('1', 299.98)

#### Create RDD using data from collection

In [20]:
#create a list
numList=range(1,100)
numListRDD = sc.parallelize(numList) #to convert list to RDD
numListRDD.count()

99

In [26]:
#Real world use case
#Read data from local file system and convert to RDD
! ls -l retail_db/order_items

total 5452
-rwxr--r-- 1 smbuser smbuser 5581078 May 11 06:20 part-00000


In [37]:
order_items = open("/home/pi/shared/retail_db/order_items/part-00000").read().splitlines()
print(type(order_items))
order_itemsRDD = sc.parallelize(order_items)
print(order_itemsRDD.take(10))
print(type(order_itemsRDD))

<class 'list'>
['1,1,957,1,299.98,299.98', '2,2,1073,1,199.99,199.99', '3,2,502,5,250.0,50.0', '4,2,403,1,129.99,129.99', '5,4,897,2,49.98,24.99', '6,4,365,5,299.95,59.99', '7,4,502,3,150.0,50.0', '8,4,1014,4,199.92,49.98', '9,5,957,1,299.98,299.98', '10,5,365,5,299.95,59.99']
<class 'pyspark.rdd.RDD'>


### Read data from Different File Formats

In [39]:
sc

In [41]:
sqlContext  #created in top of sc

<pyspark.sql.context.SQLContext at 0xb0266ab0>

In [43]:
#read data from json file
iris=sqlContext.read.json('/user/pi/iris.json')
iris.show()

+---------------+-----------+----------+-----------+----------+-------+
|_corrupt_record|petalLength|petalWidth|sepalLength|sepalWidth|species|
+---------------+-----------+----------+-----------+----------+-------+
|              [|       null|      null|       null|      null|   null|
|           null|        1.4|       0.2|        5.1|       3.5| setosa|
|           null|        1.4|       0.2|        4.9|       3.0| setosa|
|           null|        1.3|       0.2|        4.7|       3.2| setosa|
|           null|        1.5|       0.2|        4.6|       3.1| setosa|
|           null|        1.4|       0.2|        5.0|       3.6| setosa|
|           null|        1.7|       0.4|        5.4|       3.9| setosa|
|           null|        1.4|       0.3|        4.6|       3.4| setosa|
|           null|        1.5|       0.2|        5.0|       3.4| setosa|
|           null|        1.4|       0.2|        4.4|       2.9| setosa|
|           null|        1.5|       0.1|        4.9|       3.1| 

In [45]:
iris=sqlContext.load('/user/pi/iris.json','json')
iris.show()

AttributeError: 'SQLContext' object has no attribute 'load'

In [49]:
type(iris)

pyspark.sql.dataframe.DataFrame

### Row Level Transformations

#### String Manupulation

In [3]:
#Action first() converts RDDs first fine to unicode text
#read data from HDFS
! hdfs dfs -ls /user/pi/retail_db

2020-05-18 17:06:04,723 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 9 items
drwxr-xr-x   - pi supergroup          0 2020-05-11 12:06 /user/pi/retail_db/.git
-rw-r--r--   2 pi supergroup        826 2020-05-11 12:05 /user/pi/retail_db/README.md
drwxr-xr-x   - pi supergroup          0 2020-05-11 12:06 /user/pi/retail_db/categories
-rw-r--r--   2 pi supergroup   10303495 2020-05-11 12:06 /user/pi/retail_db/create_db.sql
drwxr-xr-x   - pi supergroup          0 2020-05-11 12:06 /user/pi/retail_db/customers
drwxr-xr-x   - pi supergroup          0 2020-05-11 12:06 /user/pi/retail_db/departments
drwxr-xr-x   - pi supergroup          0 2020-05-11 12:05 /user/pi/retail_db/order_items
drwxr-xr-x   - pi supergroup          0 2020-05-11 12:05 /user/pi/retail_db/orders
drwxr-xr-x   - pi supergroup          0 2020-05-11 12:06 /user/pi/retail_db/products


In [4]:
orders = sc.textFile("/user/pi/retail_db/orders")
s = orders.first()
type(s)

str

In [16]:
print(s)
print(s[:10])
print(s[2:23])
print(len(s))
print(s.split(","))
print(s.split(",")[1])
print(int(s.split(",")[1][:10].replace("-","")))
##convertion function int(),float()

1,2013-07-25 00:00:00.0,11599,CLOSED
1,2013-07-
2013-07-25 00:00:00.0
36
['1', '2013-07-25 00:00:00.0', '11599', 'CLOSED']
2013-07-25 00:00:00.0
20130725


In [18]:
help(str)

Help on class str in module builtins:

class str(object)
 |  str(object='') -> str
 |  str(bytes_or_buffer[, encoding[, errors]]) -> str
 |  
 |  Create a new string object from the given object. If encoding or
 |  errors is specified, then the object must expose a data buffer
 |  that will be decoded using the given encoding and error handler.
 |  Otherwise, returns the result of object.__str__() (if defined)
 |  or repr(object).
 |  encoding defaults to sys.getdefaultencoding().
 |  errors defaults to 'strict'.
 |  
 |  Methods defined here:
 |  
 |  __add__(self, value, /)
 |      Return self+value.
 |  
 |  __contains__(self, key, /)
 |      Return key in self.
 |  
 |  __eq__(self, value, /)
 |      Return self==value.
 |  
 |  __format__(self, format_spec, /)
 |      Return a formatted version of the string as described by format_spec.
 |  
 |  __ge__(self, value, /)
 |      Return self>=value.
 |  
 |  __getattribute__(self, name, /)
 |      Return getattr(self, name).
 |  
 |  

#### map() Transformation

In [20]:
#Return a new distributed dataset formed by passing each element of the source through a function func.
#create a tuple with orderid and revenue
order_items = sc.textFile('/user/pi/retail_db/order_items')
order_items.take(10)

['1,1,957,1,299.98,299.98',
 '2,2,1073,1,199.99,199.99',
 '3,2,502,5,250.0,50.0',
 '4,2,403,1,129.99,129.99',
 '5,4,897,2,49.98,24.99',
 '6,4,365,5,299.95,59.99',
 '7,4,502,3,150.0,50.0',
 '8,4,1014,4,199.92,49.98',
 '9,5,957,1,299.98,299.98',
 '10,5,365,5,299.95,59.99']

In [23]:
orderRev = order_items.map(lambda order:(order.split(",")[2],float(order.split(",")[4])))
orderRev.take(10)

[('957', 299.98),
 ('1073', 199.99),
 ('502', 250.0),
 ('403', 129.99),
 ('897', 49.98),
 ('365', 299.95),
 ('502', 150.0),
 ('1014', 199.92),
 ('957', 299.98),
 ('365', 299.95)]

#### flatMap() Transformation

In [26]:
#Map function it returns a single output for each elemnt in RDD but flatMap will flatten all elements to form a single collection.
wordsList = ["how are you","where are you","how was it"]
WordsRDD = sc.parallelize(wordsList)
#Map
WordsMap = WordsRDD.map(lambda line:line.split(" "))
print(WordsMap.take(10))
WordsFlatMap = WordsRDD.flatMap(lambda line:line.split(" "))
print(WordsFlatMap.take(10))

[['how', 'are', 'you'], ['where', 'are', 'you'], ['how', 'was', 'it']]
['how', 'are', 'you', 'where', 'are', 'you', 'how', 'was', 'it']


In [29]:
wordCnt = WordsFlatMap.countByValue()
for word,cnt in wordCnt.items():
    print('{}-{}'.format(word,cnt))

how-2
are-2
you-2
where-1
was-1
it-1


#### filter() Transformation

In [40]:
#Return a new dataset formed by selecting those elements of the source on which func returns true.
#Used to Filter rows from RDD
#Extract all closed and complered order of jan 2014
orders = sc.textFile("/user/pi/retail_db/orders")
print(orders.take(10))
orderFil = orders.filter(lambda order:order.split(",")[3] in ('CLOSED','COMPLETE') and order.split(",")[1][:7] == '2014-01')
print(orderFil.count())
orderFil.take(10)

['1,2013-07-25 00:00:00.0,11599,CLOSED', '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT', '3,2013-07-25 00:00:00.0,12111,COMPLETE', '4,2013-07-25 00:00:00.0,8827,CLOSED', '5,2013-07-25 00:00:00.0,11318,COMPLETE', '6,2013-07-25 00:00:00.0,7130,COMPLETE', '7,2013-07-25 00:00:00.0,4530,COMPLETE', '8,2013-07-25 00:00:00.0,2911,PROCESSING', '9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT', '10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT']
2544


['25882,2014-01-01 00:00:00.0,4598,COMPLETE',
 '25888,2014-01-01 00:00:00.0,6735,COMPLETE',
 '25889,2014-01-01 00:00:00.0,10045,COMPLETE',
 '25891,2014-01-01 00:00:00.0,3037,CLOSED',
 '25895,2014-01-01 00:00:00.0,1044,COMPLETE',
 '25897,2014-01-01 00:00:00.0,6405,COMPLETE',
 '25898,2014-01-01 00:00:00.0,3950,COMPLETE',
 '25899,2014-01-01 00:00:00.0,8068,CLOSED',
 '25900,2014-01-01 00:00:00.0,2382,CLOSED',
 '25901,2014-01-01 00:00:00.0,3099,COMPLETE']

### JOINS 

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
Data should be in (K, V) and (K, W) tuples and returns a dataset of (K, (V, W))
Type of Joins are for 2 RDDs A and B are where A = (K, V) and B= (K, W) then result in format (K, (V, W))
    --> A.join(B)
    --> A.leftOuterJoin(B)
    --> A.rightOuterJoin(B)
    --> A.fullOuterJoin(B)

In [47]:
order_items = sc.textFile("/user/pi/retail_db/order_items")
order_items_PairRDD = order_items.map(lambda order_item:(int(order_item.split(",")[2]),float(order_item.split(",")[4])))
orderRevenue = order_items_PairRDD.reduceByKey(lambda x,y:x+y)
print(orderRevenue.take(10))
orders = sc.textFile("/user/pi/retail_db/orders")
orderStatus = orders.map(lambda order:(int(order.split(",")[2]),order.split(",")[3]))
print(orderStatus.take(10))

[(1014, 2888993.939999649), (502, 3147800.0), (1004, 6929653.499999708), (642, 27840.0), (572, 35191.19999999998), (44, 56330.61000000003), (564, 27210.0), (924, 14151.149999999976), (926, 14870.699999999972), (278, 36576.86999999999)]
[(11599, 'CLOSED'), (256, 'PENDING_PAYMENT'), (12111, 'COMPLETE'), (8827, 'CLOSED'), (11318, 'COMPLETE'), (7130, 'COMPLETE'), (4530, 'COMPLETE'), (2911, 'PROCESSING'), (5657, 'PENDING_PAYMENT'), (5648, 'PENDING_PAYMENT')]


In [56]:
#Inner join
OrderInner = orderRevenue.join(orderStatus)
print(OrderInner.take(10))
#left outer join
Orderleft = orderRevenue.leftOuterJoin(orderStatus)0
print(OrderInner.take(10))
#right Outer join
Orderright = orderRevenue.rightOuterJoin(orderStatus)
print(OrderInner.take(10))
#full outer join
Orderfull = orderRevenue.fullOuterJoin(orderStatus)
print(OrderInner.take(10))

[(652, (7539.419999999991, 'PENDING_PAYMENT')), (652, (7539.419999999991, 'PROCESSING')), (652, (7539.419999999991, 'PENDING')), (652, (7539.419999999991, 'PENDING_PAYMENT')), (728, (61490.0, 'COMPLETE')), (728, (61490.0, 'COMPLETE')), (728, (61490.0, 'COMPLETE')), (728, (61490.0, 'PENDING_PAYMENT')), (728, (61490.0, 'COMPLETE')), (728, (61490.0, 'COMPLETE'))]
[(652, (7539.419999999991, 'PENDING_PAYMENT')), (652, (7539.419999999991, 'PROCESSING')), (652, (7539.419999999991, 'PENDING')), (652, (7539.419999999991, 'PENDING_PAYMENT')), (728, (61490.0, 'COMPLETE')), (728, (61490.0, 'COMPLETE')), (728, (61490.0, 'COMPLETE')), (728, (61490.0, 'PENDING_PAYMENT')), (728, (61490.0, 'COMPLETE')), (728, (61490.0, 'COMPLETE'))]
[(652, (7539.419999999991, 'PENDING_PAYMENT')), (652, (7539.419999999991, 'PROCESSING')), (652, (7539.419999999991, 'PENDING')), (652, (7539.419999999991, 'PENDING_PAYMENT')), (728, (61490.0, 'COMPLETE')), (728, (61490.0, 'COMPLETE')), (728, (61490.0, 'COMPLETE')), (728, (6

### Aggregations

In [16]:
order_items = sc.textFile("/user/pi/retail_db/order_items")
for order_item in order_items.take(10):print(order_item)

1,1,957,1,299.98,299.98
2,2,1073,1,199.99,199.99
3,2,502,5,250.0,50.0
4,2,403,1,129.99,129.99
5,4,897,2,49.98,24.99
6,4,365,5,299.95,59.99
7,4,502,3,150.0,50.0
8,4,1014,4,199.92,49.98
9,5,957,1,299.98,299.98
10,5,365,5,299.95,59.99


#### Action: count()

In [18]:
#Aggregation - total
total_rec = order_items.count()
print(total_rec)

172198


#### Action: reduce()

In [12]:
#Aggregation - total - Get revenue for given order_id
from operator import add
order_item2=order_items.filter(lambda oi:int(oi.split(",")[1])==2)
for order_item in order_item2.take(10):print(order_item)
#method1
order2_rev = order_item2.map(lambda oi:float(oi.split(",")[4])).reduce(add)
print('Total Revnue from Order id : 2 is ',order2_rev)
#method2
order2_rev = order_item2.map(lambda oi:float(oi.split(",")[4])).reduce(lambda x,y:x+y)
print('Total Revnue from Order id : 2 is ',order2_rev)

2,2,1073,1,199.99,199.99
3,2,502,5,250.0,50.0
4,2,403,1,129.99,129.99
Total Revnue from Order id : 2 is  579.98
Total Revnue from Order id : 2 is  579.98


In [15]:
#Aggregation - total - Get order item details  which has minimum  order item subtotal for given order id
order_item2=order_items.filter(lambda oi:int(oi.split(",")[1])==2)
for order_item in order_item2.take(10):print(order_item)
order_item2.reduce(lambda x,y:x if float(x.split(",")[4]) < float(x.split(",")[4]) else y)

2,2,1073,1,199.99,199.99
3,2,502,5,250.0,50.0
4,2,403,1,129.99,129.99


'4,2,403,1,129.99,129.99'

#### Action: countByKey()

In [22]:
#Get Count by status
orders = sc.textFile("/user/pi/retail_db/orders")
for order in orders.take(10):print(order)

1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE
4,2013-07-25 00:00:00.0,8827,CLOSED
5,2013-07-25 00:00:00.0,11318,COMPLETE
6,2013-07-25 00:00:00.0,7130,COMPLETE
7,2013-07-25 00:00:00.0,4530,COMPLETE
8,2013-07-25 00:00:00.0,2911,PROCESSING
9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT
10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT


In [23]:
ordersPairRdd = orders.map(lambda order:(order.split(",")[3],1))
ordersPairRdd.take(10)

[('CLOSED', 1),
 ('PENDING_PAYMENT', 1),
 ('COMPLETE', 1),
 ('CLOSED', 1),
 ('COMPLETE', 1),
 ('COMPLETE', 1),
 ('COMPLETE', 1),
 ('PROCESSING', 1),
 ('PENDING_PAYMENT', 1),
 ('PENDING_PAYMENT', 1)]

In [32]:
ordersPairRdd.countByKey()

defaultdict(int,
            {'CLOSED': 7556,
             'PENDING_PAYMENT': 15030,
             'COMPLETE': 22899,
             'PROCESSING': 8275,
             'PAYMENT_REVIEW': 729,
             'PENDING': 7610,
             'ON_HOLD': 3798,
             'CANCELED': 1428,
             'SUSPECTED_FRAUD': 1558})

In [27]:
help(ordersPairRdd.countByKey)

Help on method countByKey in module pyspark.rdd:

countByKey() method of pyspark.rdd.PipelinedRDD instance
    Count the number of elements for each key, and return the result to the
    master as a dictionary.
    
    >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    >>> sorted(rdd.countByKey().items())
    [('a', 2), ('b', 1)]



In [33]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.countByKey().items())

[('a', 2), ('b', 1)]

#### Combiners

#### groupByKey()

In [41]:
#grouByKey() is least used since it not uses combiner
#Get revenue for each order id
order_items = sc.textFile("/user/pi/retail_db/order_items")
for order_item in order_items.take(10):print(order_item)
orderItemsPairRDD = order_items.map(lambda oi:(int(oi.split(",")[1]),float(oi.split(",")[4])))
print(orderItemsPairRDD.take(10))
orderRevGrpBy = orderItemsPairRDD.groupByKey()
orderRev = orderRevGrpBy.map(lambda oi:(oi[0],round(sum(oi[1]),2)))
for order,rev in orderRev.take(10):
    print('{}-{}'.format(order,rev))

1,1,957,1,299.98,299.98
2,2,1073,1,199.99,199.99
3,2,502,5,250.0,50.0
4,2,403,1,129.99,129.99
5,4,897,2,49.98,24.99
6,4,365,5,299.95,59.99
7,4,502,3,150.0,50.0
8,4,1014,4,199.92,49.98
9,5,957,1,299.98,299.98
10,5,365,5,299.95,59.99
[(1, 299.98), (2, 199.99), (2, 250.0), (2, 129.99), (4, 49.98), (4, 299.95), (4, 150.0), (4, 199.92), (5, 299.98), (5, 299.95)]
35186-249.96
35188-79.98
35190-879.91
35192-120.0
35194-100.0
35196-39.99
35198-939.91
35200-199.99
35206-879.79
35208-889.94


In [63]:
#Get order item details in descending order by revenue
order_items = sc.textFile("/user/pi/retail_db/order_items")
orderItemsPairRDD = order_items.map(lambda oi:(int(oi.split(",")[1]),oi))
orderItemsPairRDDGP = orderItemsPairRDD.groupByKey()
orderItemsSrt = orderItemsPairRDDGP.map(lambda oi:sorted(oi[1],key = lambda lst:float(lst.split(",")[4]),reverse=True))
orderItemsSrt.take(10)

[['3,2,502,5,250.0,50.0',
  '2,2,1073,1,199.99,199.99',
  '4,2,403,1,129.99,129.99'],
 ['6,4,365,5,299.95,59.99',
  '8,4,1014,4,199.92,49.98',
  '7,4,502,3,150.0,50.0',
  '5,4,897,2,49.98,24.99'],
 ['18,8,365,5,299.95,59.99',
  '19,8,1014,4,199.92,49.98',
  '17,8,365,3,179.97,59.99',
  '20,8,502,1,50.0,50.0'],
 ['24,10,1073,1,199.99,199.99',
  '28,10,1073,1,199.99,199.99',
  '26,10,403,1,129.99,129.99',
  '25,10,1014,2,99.96,49.98',
  '27,10,917,1,21.99,21.99'],
 ['37,12,191,5,499.95,99.99',
  '34,12,957,1,299.98,299.98',
  '38,12,502,5,250.0,50.0',
  '36,12,1014,3,149.94,49.98',
  '35,12,134,4,100.0,25.0'],
 ['40,14,1004,1,399.98,399.98',
  '41,14,1014,2,99.96,49.98',
  '42,14,502,1,50.0,50.0'],
 ['49,16,365,5,299.95,59.99', '48,16,365,2,119.98,59.99'],
 ['55,18,1073,1,199.99,199.99',
  '57,18,403,1,129.99,129.99',
  '56,18,365,2,119.98,59.99'],
 ['63,20,365,5,299.95,59.99',
  '60,20,502,5,250.0,50.0',
  '61,20,1014,4,199.92,49.98',
  '62,20,403,1,129.99,129.99'],
 ['71,24,502,5,250.0

In [64]:
orderItemsSrt = orderItemsPairRDDGP.flatMap(lambda oi:sorted(oi[1],key = lambda lst:float(lst.split(",")[4]),reverse=True))
orderItemsSrt.take(10)

['87888,35186,403,1,129.99,129.99',
 '87889,35186,627,3,119.97,39.99',
 '87892,35188,627,2,79.98,39.99',
 '87897,35190,1004,1,399.98,399.98',
 '87894,35190,502,4,200.0,50.0',
 '87895,35190,403,1,129.99,129.99',
 '87896,35190,1014,2,99.96,49.98',
 '87898,35190,1014,1,49.98,49.98',
 '87899,35192,642,4,120.0,30.0',
 '87903,35194,502,2,100.0,50.0']

In [65]:
help(sorted)

Help on built-in function sorted in module builtins:

sorted(iterable, /, *, key=None, reverse=False)
    Return a new list containing all items from the iterable in ascending order.
    
    A custom key function can be supplied to customize the sort order, and the
    reverse flag can be set to request the result in descending order.



#### reduceByKey()

In [71]:
#order items
order_items = sc.textFile("/user/pi/retail_db/order_items")
order_items.take(10)

['1,1,957,1,299.98,299.98',
 '2,2,1073,1,199.99,199.99',
 '3,2,502,5,250.0,50.0',
 '4,2,403,1,129.99,129.99',
 '5,4,897,2,49.98,24.99',
 '6,4,365,5,299.95,59.99',
 '7,4,502,3,150.0,50.0',
 '8,4,1014,4,199.92,49.98',
 '9,5,957,1,299.98,299.98',
 '10,5,365,5,299.95,59.99']

In [79]:
#Get revenue for each order id
order_item_pairRDD = order_items.map(lambda oi:(int(oi.split(",")[1]),float(oi.split(",")[4])))
order_item_rev = order_item_pairRDD.reduceByKey(lambda x,y:x+y)
order_item_rev_rnd = order_item_rev.map(lambda tp:(tp[0],round(tp[1],2)))
print(order_item_rev_rnd.take(10))

#method2
from operator import add
order_item_rev = order_item_pairRDD.reduceByKey(add)
order_item_rev_rnd = order_item_rev.map(lambda tp:(tp[0],round(tp[1],2)))
print(order_item_rev_rnd.take(10))

[(35186, 249.96), (35188, 79.98), (35190, 879.91), (35192, 120.0), (35194, 100.0), (35196, 39.99), (35198, 939.91), (35200, 199.99), (35206, 879.79), (35208, 889.94)]
[(2, 579.98), (4, 699.85), (8, 729.84), (10, 651.92), (12, 1299.87), (14, 549.94), (16, 419.93), (18, 449.96), (20, 879.86), (24, 829.97)]


In [81]:
#Get min revenue for each order id
order_item_pairRDD = order_items.map(lambda oi:(int(oi.split(",")[1]),float(oi.split(",")[4])))
order_item_rev = order_item_pairRDD.reduceByKey(lambda x,y:x if x<y else y)
print(order_item_rev.take(10))

[(35186, 119.97), (35188, 79.98), (35190, 49.98), (35192, 120.0), (35194, 100.0), (35196, 39.99), (35198, 50.0), (35200, 199.99), (35206, 74.97), (35208, 100.0)]


In [83]:
#Get order item details with minimum subtotal for each order
order_item_pairRDD = order_items.map(lambda oi:(int(oi.split(",")[1]),oi))
order_item_rev = order_item_pairRDD.reduceByKey(lambda x,y:x if float(x.split(",")[4]) < float(y.split(",")[4]) else y)
print(order_item_rev.take(10))

[(2, '4,2,403,1,129.99,129.99'), (4, '5,4,897,2,49.98,24.99'), (8, '20,8,502,1,50.0,50.0'), (10, '27,10,917,1,21.99,21.99'), (12, '35,12,134,4,100.0,25.0'), (14, '42,14,502,1,50.0,50.0'), (16, '48,16,365,2,119.98,59.99'), (18, '56,18,365,2,119.98,59.99'), (20, '62,20,403,1,129.99,129.99'), (24, '70,24,502,1,50.0,50.0')]


#### aggregateByKey()

In [87]:
#Get Revenue and count of itmes for each order_id
order_item_pairRDD = order_items.map(lambda oi:(int(oi.split(",")[1]),float(oi.split(",")[4])))
order_item_pairRDD.take(10)

[(1, 299.98),
 (2, 199.99),
 (2, 250.0),
 (2, 129.99),
 (4, 49.98),
 (4, 299.95),
 (4, 150.0),
 (4, 199.92),
 (5, 299.98),
 (5, 299.95)]

In [92]:
#Need output in format (2,(579.98,3))
#aggregateByKey()
#param1 - initialize output type variable
#param2 - x data type is (0.0,0) and y data type is 0.0 (revenue) - funtion for combiner
#param3 - data type of both x and y is of (0.0,0)
oiAgg = order_item_pairRDD.aggregateByKey((0.0,0),\
                                         lambda x,y:(x[0]+y,x[1]+1),\
                                         lambda x,y:(x[0]+y[0],x[1],y[1])\
                                         )
oiAgg.take(10)                                        
    

[(2, (579.98, 3)),
 (4, (699.85, 4)),
 (8, (729.8399999999999, 4)),
 (10, (651.9200000000001, 5)),
 (12, (1299.8700000000001, 5)),
 (14, (549.94, 3)),
 (16, (419.93, 2)),
 (18, (449.96000000000004, 3)),
 (20, (879.8599999999999, 4)),
 (24, (829.97, 5))]

In [97]:
#same output using reduceByKey()
order_item_pairRDD = order_items.map(lambda oi:(int(oi.split(",")[1]),(float(oi.split(",")[4]),1)))
rev_cnt = order_item_pairRDD.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
rev_cnt.take(10)

[(2, (579.98, 3)),
 (4, (699.85, 4)),
 (8, (729.8399999999999, 4)),
 (10, (651.9200000000001, 5)),
 (12, (1299.8700000000001, 5)),
 (14, (549.94, 3)),
 (16, (419.93, 2)),
 (18, (449.96000000000004, 3)),
 (20, (879.8599999999999, 4)),
 (24, (829.97, 5))]

### Sorting

#### sortByKey()

In [106]:
#sort data by product price descending
import re

class Utils():
    COMMA_DELIMITER = re.compile(''',(?=(?:[^"]*"[^"]*")*[^"]*$)''')

products = sc.textFile("/user/pi/retail_db/products")
productsPairRDD=products.map(lambda product:(float(Utils.COMMA_DELIMITER.split(product)[4]),product))
productsSorted=productsPairRDD.sortByKey(ascending=False).map(lambda x:x[1])
for products in productsSorted.take(10):
    print(products)

208,10,SOLE E35 Elliptical,,1999.99,http://images.acmesports.sports/SOLE+E35+Elliptical
66,4,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
199,10,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
496,22,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
1048,47,"Spalding Beast 60"" Glass Portable Basketball ",,1099.99,http://images.acmesports.sports/Spalding+Beast+60%22+Glass+Portable+Basketball+Hoop
60,4,SOLE E25 Elliptical,,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical
197,10,SOLE E25 Elliptical,,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical
488,22,SOLE E25 Elliptical,,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical
694,32,Callaway Women's Solaire Gems 20-Piece Comple,,999.99,http://images.acmesports.sports/Callaway+Women%27s+Solaire+Gems+20-Piece+Complete+Set+-...
695,32,Callaway Women's Solaire Gems 20-Piece Comple,,999.99,http://images.acmesports.sports/Ca

In [113]:
#Sort data by CategoryId and then price descending
products = sc.textFile("/user/pi/retail_db/products")
productsPairRDD = products.map(lambda p:((int(Utils.COMMA_DELIMITER.split(p)[1]),-float(Utils.COMMA_DELIMITER.split(p)[4])),p))
productsPairRDD.sortByKey().map(lambda x:x[1]).take(10)
#sortByKey can sort both key in key tuple in ascending order or both in descending order.
#Inorder to sort one in ascending and other in decending ,negative the value for decending key

['16,2,Riddell Youth 360 Custom Football Helmet,,299.99,http://images.acmesports.sports/Riddell+Youth+360+Custom+Football+Helmet',
 '11,2,Fitness Gear 300 lb Olympic Weight Set,,209.99,http://images.acmesports.sports/Fitness+Gear+300+lb+Olympic+Weight+Set',
 '5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet',
 '14,2,Quik Shade Summit SX170 10 FT. x 10 FT. Canop,,199.99,http://images.acmesports.sports/Quik+Shade+Summit+SX170+10+FT.+x+10+FT.+Canopy',
 "12,2,Under Armour Men's Highlight MC Alter Ego Fla,,139.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Alter+Ego+Flash+Football...",
 "23,2,Under Armour Men's Highlight MC Alter Ego Hul,,139.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Alter+Ego+Hulk+Football...",
 "6,2,Jordan Men's VI Retro TD Football Cleat,,134.99,http://images.acmesports.sports/Jordan+Men%27s+VI+Retro+TD+Football+Cleat",
 "2,2,Und

### Ranking

#### Global Ranking using sortByKey and take

In [4]:
#sort data by product price descending
import re
class Utils():
    COMMA_DELIMITER = re.compile(''',(?=(?:[^"]*"[^"]*")*[^"]*$)''')
    
products = sc.textFile("/user/pi/retail_db/products")
prodctsMap = products.map(lambda product:(Utils.COMMA_DELIMITER.split(product)[4],product))
prodctsMap_Sorted = prodctsMap.sortByKey(ascending=False).map(lambda pd:pd[1])
for p in prodctsMap_Sorted.take(10):
    print(p)

694,32,Callaway Women's Solaire Gems 20-Piece Comple,,999.99,http://images.acmesports.sports/Callaway+Women%27s+Solaire+Gems+20-Piece+Complete+Set+-...
695,32,Callaway Women's Solaire Gems 20-Piece Comple,,999.99,http://images.acmesports.sports/Callaway+Women%27s+Solaire+Gems+20-Piece+Complete+Set+-...
60,4,SOLE E25 Elliptical,,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical
197,10,SOLE E25 Elliptical,,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical
488,22,SOLE E25 Elliptical,,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical
709,32,Top Flite Women's Aero Adjustable Driver - Gr,,99.99,http://images.acmesports.sports/Top+Flite+Women%27s+Aero+Adjustable+Driver+-+Grey
710,32,Top Flite Women's Aero Adjustable Driver - Pi,,99.99,http://images.acmesports.sports/Top+Flite+Women%27s+Aero+Adjustable+Driver+-+Pink
719,33,Nike Lunar Cypress Golf Shoes,,99.99,http://images.acmesports.sports/Nike+Lunar+Cypress+Golf+Shoes
733,33,Nike Women's Lunar Empress Golf Shoes,,9

### Global using takeOrdered or top
#--takeOrdered() - sort data in ascending order and first n elements
#--top() - sort data in descending order and first n elements
#--Can be applied on RDD directly,no pairRDD is required
#--Both these functions are Action

In [7]:
#order products by revenue asceding - using takeOrdered()
products = sc.textFile("/user/pi/retail_db/products")
productsBottom10 = products.takeOrdered(10,lambda p:float(Utils.COMMA_DELIMITER.split(p)[4]))
for product in productsBottom10:
    print(product)

38,3,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat
388,18,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat
414,19,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat
517,24,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat
547,25,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat
934,42,Callaway X Hot Driver,,0.0,http://images.acmesports.sports/Callaway+X+Hot+Driver
1284,57,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat
624,29,adidas Batting Helmet Hardware

In [9]:
#order products by revenue descending - using top()
productsTop10 = products.top(10,lambda p:float(Utils.COMMA_DELIMITER.split(p)[4]))
for product in productsTop10:
    print(product)

208,10,SOLE E35 Elliptical,,1999.99,http://images.acmesports.sports/SOLE+E35+Elliptical
66,4,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
199,10,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
496,22,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
1048,47,"Spalding Beast 60"" Glass Portable Basketball ",,1099.99,http://images.acmesports.sports/Spalding+Beast+60%22+Glass+Portable+Basketball+Hoop
60,4,SOLE E25 Elliptical,,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical
197,10,SOLE E25 Elliptical,,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical
488,22,SOLE E25 Elliptical,,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical
694,32,Callaway Women's Solaire Gems 20-Piece Comple,,999.99,http://images.acmesports.sports/Callaway+Women%27s+Solaire+Gems+20-Piece+Complete+Set+-...
695,32,Callaway Women's Solaire Gems 20-Piece Comple,,999.99,http://images.acmesports.sports/Ca

In [11]:
#order products by revenue asceding - using top()
productsBottom10 = products.top(10,lambda p:-float(Utils.COMMA_DELIMITER.split(p)[4]))
for product in productsBottom10:
    print(product)

38,3,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat
388,18,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat
414,19,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat
517,24,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat
547,25,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat
934,42,Callaway X Hot Driver,,0.0,http://images.acmesports.sports/Callaway+X+Hot+Driver
1284,57,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat
624,29,adidas Batting Helmet Hardware

In [17]:
#order products by revenue descending - using takeOrdered()
productsTop10 = products.takeOrdered(10,lambda p:-float(Utils.COMMA_DELIMITER.split(p)[4]))
for product in productsTop10:
    print(product)

208,10,SOLE E35 Elliptical,,1999.99,http://images.acmesports.sports/SOLE+E35+Elliptical
66,4,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
199,10,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
496,22,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
1048,47,"Spalding Beast 60"" Glass Portable Basketball ",,1099.99,http://images.acmesports.sports/Spalding+Beast+60%22+Glass+Portable+Basketball+Hoop
60,4,SOLE E25 Elliptical,,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical
197,10,SOLE E25 Elliptical,,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical
488,22,SOLE E25 Elliptical,,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical
694,32,Callaway Women's Solaire Gems 20-Piece Comple,,999.99,http://images.acmesports.sports/Callaway+Women%27s+Solaire+Gems+20-Piece+Complete+Set+-...
695,32,Callaway Women's Solaire Gems 20-Piece Comple,,999.99,http://images.acmesports.sports/Ca

In [18]:
type(productsTop10)

list

### Rank By Key

#### Get top N products by price per category - Introduction

In [27]:
#Using Python Collection
products = sc.textFile("/user/pi/retail_db/products")
productMap = products.map(lambda p:(Utils.COMMA_DELIMITER.split(p)[1],p))
productMapGp=productMap.groupByKey()
productCat=productMapGp.flatMap(lambda p:sorted(p[1],key=lambda l:Utils.COMMA_DELIMITER.split(l)[4],reverse=True)[:3])
productCat.top(10,lambda p:-int(Utils.COMMA_DELIMITER.split(p)[1]))

['7,2,Schutt Youth Recruit Hybrid Custom Football H,,99.99,http://images.acmesports.sports/Schutt+Youth+Recruit+Hybrid+Custom+Football+Helmet+2014',
 "3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat",
 "4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat",
 '31,3,Nike+ Fuelband SE,,99.0,http://images.acmesports.sports/Nike%2B+Fuelband+SE',
 "26,3,Nike Men's USA White Home Stadium Soccer Jers,,90.0,http://images.acmesports.sports/Nike+Men%27s+USA+White+Home+Stadium+Soccer+Jersey",
 "29,3,Nike Men's USA Away Stadium Replica Soccer Je,,90.0,http://images.acmesports.sports/Nike+Men%27s+USA+Away+Stadium+Replica+Soccer+Jersey",
 '60,4,SOLE E25 Elliptical,,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical',
 '56,4,Fitbit Flex Wireless Activity & Sleep Wristba,,99.95,http://images.acmesports.sports/Fitbit+Fl

#### Get top N priced products for per catagory - Introduction

In [105]:
from itertools import takewhile

def getTopNProducts(products,topN):
    productsSorted = sorted(products[1],key=lambda p:float(Utils.COMMA_DELIMITER.split(p)[4]),reverse=True)
    TopN_prices = sorted(set(map(lambda p:float(Utils.COMMA_DELIMITER.split(p)[4]),productsSorted)),reverse=True)[:topN]
    return takewhile(lambda l:float(Utils.COMMA_DELIMITER.split(l)[4]) in TopN_prices,productsSorted)

products = sc.textFile("/user/pi/retail_db/products")
prod_catMap = products.map(lambda p:(int(Utils.COMMA_DELIMITER.split(p)[1]),p))
prod_catMap_GP=prod_catMap.groupByKey()
prod_catMap_GP_TopN=prod_catMap_GP.flatMap(lambda l:getTopNProducts(l,3))
prod_catMap_GP_TopN.take(20)

['16,2,Riddell Youth 360 Custom Football Helmet,,299.99,http://images.acmesports.sports/Riddell+Youth+360+Custom+Football+Helmet',
 '11,2,Fitness Gear 300 lb Olympic Weight Set,,209.99,http://images.acmesports.sports/Fitness+Gear+300+lb+Olympic+Weight+Set',
 '5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet',
 '14,2,Quik Shade Summit SX170 10 FT. x 10 FT. Canop,,199.99,http://images.acmesports.sports/Quik+Shade+Summit+SX170+10+FT.+x+10+FT.+Canopy',
 '66,4,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill',
 '60,4,SOLE E25 Elliptical,,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical',
 '71,4,Diamondback Adult Response XE Mountain Bike 2,,349.98,http://images.acmesports.sports/Diamondback+Adult+Response+XE+Mountain+Bike+2014',
 '117,6,YETI Tundra 65 Chest Cooler,,399.99,http://images.acmesports.sports/YETI+Tundra+65+Chest+Cooler',
 '106,6,Teeter Hang Ups N

### Set Operations

#### Prepare Data
#Requires 2 datasets of same structure

In [117]:
#Extract OrderItesm from 2013-12 and 2014-01
#order_items dataset has no date .So need to join with orders and get data
orders = sc.textFile("/user/pi/retail_db/orders")
orderItems = sc.textFile("/user/pi/retail_db/order_items")
orders201312 = orders.filter(lambda o:o.split(",")[1][:7]=='2013-12').map(lambda o:(int(o.split(',')[0]),o))
orders201401 = orders.filter(lambda o:o.split(",")[1][:7]=='2014-01').map(lambda o:(int(o.split(',')[0]),o))
orderItemsMap = orderItems.map(lambda oi:(int(oi.split(",")[1]),oi))
orderItems201312 = orders201312.join(orderItemsMap).map(lambda oi:oi[1][1])
orderItems201401 = orders201401.join(orderItemsMap).map(lambda oi:oi[1][1])
orderItems201401.take(10)

['154733,61900,1073,1,199.99,199.99',
 '154734,61900,365,5,299.95,59.99',
 '154743,61904,724,5,500.0,100.0',
 '154756,61908,403,1,129.99,129.99',
 '154769,61912,773,1,249.99,249.99',
 '154778,61916,666,1,109.99,109.99',
 '154779,61916,1004,1,399.98,399.98',
 '154780,61916,1014,3,149.94,49.98',
 '154781,61916,1073,1,199.99,199.99',
 '154786,61920,957,1,299.98,299.98']

In [119]:
print(orderItems201312.count())
print(orderItems201401.count())

14729
14666


#### Union and distinct

In [126]:
#get allproduct ids in 2013-12 and 2014-01
productIds201312 = orderItems201312.map(lambda p:int(p.split(",")[2]))
productIds201401 = orderItems201401.map(lambda p:int(p.split(",")[2]))
print(productIds201312.count())
print(productIds201401.count())
allProducts = productIds201312.union(productIds201401)
print(allProducts.count())
allProducts = productIds201312.union(productIds201401).distinct()
print(allProducts.count())

14729
14666
29395
100


#### Intersect and Subtract

In [132]:
#intersect by default gives distinct values
#Product Ids sold in both 2013-12 and 2014-01
commonProducts = productIds201312.intersection(productIds201401)
commonProducts.count()

98

In [135]:
#product ids sold in 2013-12 and not in 2014-01
products201312only = productIds201312.subtract(productIds201401).distinct()
products201312only.count()

1

In [136]:
#product ids sold in 2014-01 and not in 2013-12
products201401only = productIds201401.subtract(productIds201312).distinct()
products201401only.count()

1

In [139]:
#product ids sold only in 2014-01 and only in 2013-12
productsonly = products201312only.union(products201401only)
productsonly.count()

2

### Saving data into HDFS - text file format

In [145]:
#get revenue per order and save to text file
from operator import add
orderItems = sc.textFile("/user/pi/retail_db/order_items")
order_rev = orderItems.map(lambda oi:(int(oi.split(",")[1]),float(oi.split(",")[4])))
orer_revenue = order_rev.reduceByKey(add)
orer_revenue.map(lambda ore:str(ore[0])+'\t'+str(ore[1])).saveAsTextFile('/user/pi/retail_db/order_items_by_revenue')

In [147]:
! hdfs dfs -ls /user/pi/retail_db/order_items_by_revenue

2020-05-20 17:37:30,321 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 3 items
-rw-r--r--   2 pi supergroup          0 2020-05-20 17:36 /user/pi/retail_db/order_items_by_revenue/_SUCCESS
-rw-r--r--   2 pi supergroup     454338 2020-05-20 17:36 /user/pi/retail_db/order_items_by_revenue/part-00000
-rw-r--r--   2 pi supergroup     454679 2020-05-20 17:36 /user/pi/retail_db/order_items_by_revenue/part-00001


In [150]:
order_items_by_revenue=sc.textFile('/user/pi/retail_db/order_items_by_revenue')
for i in order_items_by_revenue.take(10):print(i)

2	579.98
4	699.85
8	729.8399999999999
10	651.9200000000001
12	1299.8700000000001
14	549.94
16	419.93
18	449.96000000000004
20	879.8599999999999
24	829.97


###  Saving data into HDFS - text file format with compression

In [154]:
#to verify which all compression are configured on cluster open core-site.html
! cat /opt/hadoop/etc/hadoop/core-site.xml
#check for property io.compression.codecs

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://raspberrypi1:9000</value>
</property>
<property>
<name>io.compression.codecs</name>
<value>org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.De

In [155]:
from operator import add
orderItems = sc.textFile("/user/pi/retail_db/order_items")
order_rev = orderItems.map(lambda oi:(int(oi.split(",")[1]),float(oi.split(",")[4])))
order_revenue = order_rev.reduceByKey(add)
help(order_revenue.saveAsTextFile)

Help on method saveAsTextFile in module pyspark.rdd:

saveAsTextFile(path, compressionCodecClass=None) method of pyspark.rdd.PipelinedRDD instance
    Save this RDD as a text file, using string representations of elements.
    
    @param path: path to text file
    @param compressionCodecClass: (None by default) string i.e.
        "org.apache.hadoop.io.compress.GzipCodec"
    
    >>> tempFile = NamedTemporaryFile(delete=True)
    >>> tempFile.close()
    >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
    >>> from fileinput import input
    >>> from glob import glob
    >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
    '0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n'
    
    Empty lines are tolerated when saving to text files.
    
    >>> tempFile2 = NamedTemporaryFile(delete=True)
    >>> tempFile2.close()
    >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name)
    >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*"))))
    '\n\n\n

In [167]:
order_revenue.map(lambda ore:str(ore[0])+'\t'+str(ore[1])).saveAsTextFile('/user/pi/retail_db/order_items_by_revenue_compressed',\
                        compressionCodecClass="org.apache.hadoop.io.compress.SnappyCodec")

Py4JJavaError: An error occurred while calling o4926.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:100)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1067)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$3.apply$mcV$sp(PairRDDFunctions.scala:1013)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$3.apply(PairRDDFunctions.scala:1013)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$3.apply(PairRDDFunctions.scala:1013)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1012)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:970)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$2.apply(PairRDDFunctions.scala:968)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$2.apply(PairRDDFunctions.scala:968)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:968)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$2.apply$mcV$sp(RDD.scala:1562)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$2.apply(RDD.scala:1550)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$2.apply(RDD.scala:1550)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1550)
	at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:558)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 332.0 failed 4 times, most recent failure: Lost task 1.3 in stage 332.0 (TID 718, 192.168.1.111, executor 2): java.lang.RuntimeException: native snappy library not available: this version of libhadoop was built without snappy support.
	at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:65)
	at org.apache.hadoop.io.compress.SnappyCodec.getCompressorType(SnappyCodec.java:134)
	at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:150)
	at org.apache.hadoop.io.compress.CompressionCodec$Util.createOutputStreamWithCodecPool(CompressionCodec.java:131)
	at org.apache.hadoop.io.compress.SnappyCodec.createOutputStream(SnappyCodec.java:100)
	at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:137)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.initWriter(SparkHadoopWriter.scala:230)
	at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:120)
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:83)
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:78)
	... 48 more
Caused by: java.lang.RuntimeException: native snappy library not available: this version of libhadoop was built without snappy support.
	at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:65)
	at org.apache.hadoop.io.compress.SnappyCodec.getCompressorType(SnappyCodec.java:134)
	at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:150)
	at org.apache.hadoop.io.compress.CompressionCodec$Util.createOutputStreamWithCodecPool(CompressionCodec.java:131)
	at org.apache.hadoop.io.compress.SnappyCodec.createOutputStream(SnappyCodec.java:100)
	at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:137)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.initWriter(SparkHadoopWriter.scala:230)
	at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:120)
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:83)
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


### Saving data into HDFS using Data Frames - json

In [186]:
from operator import add
orderItems = sc.textFile("/user/pi/retail_db/order_items")
order_rev = orderItems.map(lambda oi:(int(oi.split(",")[1]),float(oi.split(",")[4])))
order_revenue = order_rev.reduceByKey(add).map(lambda o:(o[0],round(float(o[1]),2)))
order_revenue.take(10)

[(2, 579.98),
 (4, 699.85),
 (8, 729.84),
 (10, 651.92),
 (12, 1299.87),
 (14, 549.94),
 (16, 419.93),
 (18, 449.96),
 (20, 879.86),
 (24, 829.97)]

In [172]:
help(order_revenue.toDF)

Help on method toDF in module pyspark.sql.session:

toDF(schema=None, sampleRatio=None) method of pyspark.rdd.PipelinedRDD instance
    Converts current :class:`RDD` into a :class:`DataFrame`
    
    This is a shorthand for ``spark.createDataFrame(rdd, schema, sampleRatio)``
    
    :param schema: a :class:`pyspark.sql.types.StructType` or list of names of columns
    :param samplingRatio: the sample ratio of rows used for inferring
    :return: a DataFrame
    
    >>> rdd.toDF().collect()
    [Row(name=u'Alice', age=1)]



In [187]:
#save as json file
#convert to DF
order_revenueDF = order_revenue.toDF(schema=["order_id","Revenue"])
order_revenueDF.show()

+--------+-------+
|order_id|Revenue|
+--------+-------+
|       2| 579.98|
|       4| 699.85|
|       8| 729.84|
|      10| 651.92|
|      12|1299.87|
|      14| 549.94|
|      16| 419.93|
|      18| 449.96|
|      20| 879.86|
|      24| 829.97|
|      28| 1159.9|
|      30|  100.0|
|      34| 299.98|
|      36| 799.96|
|      38| 359.96|
|      42| 739.92|
|      44| 399.98|
|      46| 229.95|
|      48|  99.96|
|      50| 429.97|
+--------+-------+
only showing top 20 rows



In [188]:
order_revenueDF.write.json('/user/pi/retail_db/order_items_by_rev_json')
#order_revenueDF.save('/user/pi/retail_db/order_items_by_rev_json',"json")

In [189]:
order_revenueDF = sqlContext.read.json('/user/pi/retail_db/order_items_by_rev_json')
order_revenueDF.show()

+-------+--------+
|Revenue|order_id|
+-------+--------+
| 299.98|       1|
|1129.86|       5|
| 579.92|       7|
| 599.96|       9|
| 919.79|      11|
| 127.96|      13|
| 925.91|      15|
| 694.84|      17|
| 699.96|      19|
| 372.91|      21|
| 299.98|      23|
| 399.98|      25|
| 749.97|      27|
|1109.85|      29|
| 499.95|      31|
| 659.89|      33|
| 129.99|      35|
| 159.95|      37|
| 199.99|      39|
| 327.88|      41|
+-------+--------+
only showing top 20 rows



In [190]:
order_revenueDF.rdd.map(lambda x:(x[0],x[1])).take(10)

[(299.98, 1),
 (1129.86, 5),
 (579.92, 7),
 (599.96, 9),
 (919.79, 11),
 (127.96, 13),
 (925.91, 15),
 (694.84, 17),
 (699.96, 19),
 (372.91, 21)]