Page_No: 50 - 61
# Top N List in each Partition - Unique Keys

### Step 1: Creating a RDD

In [1]:
filePath = "./DataSet/top10data.csv"
tTen = sc.textFile(filePath)
tTen.take(3)

['cat1,12', 'cat2,13', 'cat3,14']

### Step 2: Creating PairRDD

In [2]:
def parsetTen(data):
    records = data.split(',')
    return (records[0], int(records[1]))

In [3]:
tPRDD = tTen.map(parsetTen)

In [4]:
tPRDD.take(3)

[('cat1', 12), ('cat2', 13), ('cat3', 14)]

In [5]:
tPRDD.glom().take(3)

[[('cat1', 12),
  ('cat2', 13),
  ('cat3', 14),
  ('cat4', 15),
  ('cat5', 10),
  ('cat100', 100),
  ('cat200', 200),
  ('cat300', 300),
  ('cat1001', 1001)],
 [('cat67', 67),
  ('cat22', 22),
  ('cat23', 23),
  ('cat1000', 1000),
  ('cat2000', 2000),
  ('cat400', 400),
  ('cat500', 500)]]

In [6]:
tPRDD.glom().collect()

[[('cat1', 12),
  ('cat2', 13),
  ('cat3', 14),
  ('cat4', 15),
  ('cat5', 10),
  ('cat100', 100),
  ('cat200', 200),
  ('cat300', 300),
  ('cat1001', 1001)],
 [('cat67', 67),
  ('cat22', 22),
  ('cat23', 23),
  ('cat1000', 1000),
  ('cat2000', 2000),
  ('cat400', 400),
  ('cat500', 500)]]

In [7]:
#By using sortByKey spark do it by selecting random and shuffle it 
tPRDD.sortByKey().glom().collect()

[[('cat1', 12),
  ('cat100', 100),
  ('cat1000', 1000),
  ('cat1001', 1001),
  ('cat2', 13),
  ('cat200', 200),
  ('cat2000', 2000),
  ('cat22', 22),
  ('cat23', 23)],
 [('cat3', 14),
  ('cat300', 300),
  ('cat4', 15),
  ('cat400', 400),
  ('cat5', 10),
  ('cat500', 500),
  ('cat67', 67)]]

##### Before MapPartitions

In [8]:
tPRDD.glom().collect()

[[('cat1', 12),
  ('cat2', 13),
  ('cat3', 14),
  ('cat4', 15),
  ('cat5', 10),
  ('cat100', 100),
  ('cat200', 200),
  ('cat300', 300),
  ('cat1001', 1001)],
 [('cat67', 67),
  ('cat22', 22),
  ('cat23', 23),
  ('cat1000', 1000),
  ('cat2000', 2000),
  ('cat400', 400),
  ('cat500', 500)]]

#### After MapPartitions 
#### 1: Finding Minimum and Maximum Values

In [9]:
# def partitionsFunc(data):
#     '''
#     type(data) = Python map()
#     '''
#     initialData = 0
#     for iter1 in data:
#         if initialData == 0:
#             minD = iter1[1]
#             minKv = iter1
#             maxD = iter1[1]
#             maxKv = iter1
#             initialData = 1
#         else:
#             if iter1[1] > maxD:
#                 maxD = iter1[1]
#                 maxKv = iter1
#             elif iter1[1] < minD:
#                 minD = iter1[1]
#                 minKv = iter1
#     return (maxKv, minKv)

In [10]:
def partitionsFun(data):
    '''
    type(data) = Python map()
    '''
    dataList = list(data)
    return (max(dataList, key=lambda x: x[1]), min(dataList, key=lambda x: x[1]))

In [11]:
# def partitionsFun(data):
#     '''
#     type(data) = Python map()
#     '''
#     dataList = list(sorted(data, key=lambda x: x[1]))
#     return (dataList[0], dataList[-1])

In [12]:
tPRDD.mapPartitions(partitionsFun).glom().collect()

[[('cat1001', 1001), ('cat5', 10)], [('cat2000', 2000), ('cat22', 22)]]

#### 2: Finding Top 5 List

In [13]:
def topNListAsc(data):
    dataLRDD = sorted(list(data), key=lambda x: x[1])
    return dataLRDD[:5]

def topNListDesc(data):
    dataLRDD = sorted(list(data), key=lambda x: x[1], reverse=True)
    return dataLRDD[:5]

In [14]:
tPRDD.mapPartitions(topNListAsc).glom().collect()

[[('cat5', 10), ('cat1', 12), ('cat2', 13), ('cat3', 14), ('cat4', 15)],
 [('cat22', 22),
  ('cat23', 23),
  ('cat67', 67),
  ('cat400', 400),
  ('cat500', 500)]]

In [15]:
tPRDD.mapPartitions(topNListDesc).glom().collect()

[[('cat1001', 1001),
  ('cat300', 300),
  ('cat200', 200),
  ('cat100', 100),
  ('cat4', 15)],
 [('cat2000', 2000),
  ('cat1000', 1000),
  ('cat500', 500),
  ('cat400', 400),
  ('cat67', 67)]]

##### 3: Finding Top N List

In [16]:

#need to see it in real environment Vj
#Status - Not Tested.

#The Broadcast class enables us to define globally shared data structures and then read them from any cluster node
nValue = sc.broadcast(2)
#it may be read from any cluster node within mappers, reducers, and transformers.

In [17]:
def topNListAsc(data):
    dataLRDD = sorted(list(data), key=lambda x: x[1])
    return dataLRDD[:nValue.value]

def topNListDesc(data):
    dataLRDD = sorted(list(data), key=lambda x: x[1], reverse=True)
    return dataLRDD[:nValue.value]

In [18]:
tPRDD.mapPartitions(topNListAsc).glom().collect()

[[('cat5', 10), ('cat1', 12)], [('cat22', 22), ('cat23', 23)]]

### mapPartitions Usage

In [19]:
lRDD = sc.parallelize([4, 3, 3, 1], 2)

In [20]:
# For getting the number of partitions
lRDD.getNumPartitions()

2

##### For Getting the Partition
* glom
* foreachpartitions

In [21]:
# glom() for seeing the Partitions as such
lRDD.glom().collect()

[[4, 3], [3, 1]]

In [22]:
# Refer the o/p in spark console
def display(data):
    print(list(data))
lRDD.foreachPartition(display)

In [23]:
def sumOfPartitions(data):
    yield sum(data)

In [24]:
lRDD.mapPartitions(sumOfPartitions).collect()

[7, 4]