### Pares RDD (Pair RDD)

- mapValues()
- countByKey()
- groupByKey()
- reduceByKey()
- aggregateByKey()

In [1]:
# Importando arquivo csv e criando um RDD
carros = sc.textFile('carros.csv')

In [2]:
# Criando uma Pair RDD
carrosPairRDD = carros.map(lambda x: (x.split(',')[0], x.split(',')[7]))
carrosPairRDD.take(5)

[('MAKE', 'HP'),
 ('subaru', '69'),
 ('chevrolet', '48'),
 ('mazda', '68'),
 ('toyota', '62')]

In [3]:
# Removendo o cabeçalho
header = carrosPairRDD.first()
carrosPairRDD2 = carrosPairRDD.filter(lambda line: line != header)

In [4]:
# Encontra a média de HP por marca de carro e adiciona 1 a cada contagem
addOne = carrosPairRDD2.mapValues(lambda x: (x, 1))
addOne.collect()

[('subaru', ('69', 1)),
 ('chevrolet', ('48', 1)),
 ('mazda', ('68', 1)),
 ('toyota', ('62', 1)),
 ('mitsubishi', ('68', 1)),
 ('honda', ('60', 1)),
 ('nissan', ('69', 1)),
 ('dodge', ('68', 1)),
 ('plymouth', ('68', 1)),
 ('mazda', ('68', 1)),
 ('mitsubishi', ('68', 1)),
 ('dodge', ('68', 1)),
 ('plymouth', ('68', 1)),
 ('chevrolet', ('70', 1)),
 ('toyota', ('62', 1)),
 ('dodge', ('68', 1)),
 ('honda', ('58', 1)),
 ('toyota', ('62', 1)),
 ('honda', ('76', 1)),
 ('chevrolet', ('70', 1)),
 ('nissan', ('69', 1)),
 ('mitsubishi', ('68', 1)),
 ('dodge', ('68', 1)),
 ('plymouth', ('68', 1)),
 ('mazda', ('68', 1)),
 ('isuzu', ('78', 1)),
 ('mazda', ('68', 1)),
 ('nissan', ('69', 1)),
 ('honda', ('76', 1)),
 ('toyota', ('62', 1)),
 ('toyota', ('70', 1)),
 ('mitsubishi', ('88', 1)),
 ('subaru', ('73', 1)),
 ('nissan', ('55', 1)),
 ('subaru', ('82', 1)),
 ('honda', ('76', 1)),
 ('toyota', ('70', 1)),
 ('honda', ('76', 1)),
 ('honda', ('76', 1)),
 ('nissan', ('69', 1)),
 ('nissan', ('69', 1)),
 

In [5]:
# Aplica redução por key (reduceByKey) e conta a quantidade de fabricantes de carro, a média de HP e o nome do fabricante
fabricantes = addOne.reduceByKey(lambda x, y: (int(x[0]) + int(y[0]), x[1] + y[1]))
fabricantes.collect()

[('chevrolet', (188, 3)),
 ('mazda', (1390, 16)),
 ('mitsubishi', (1353, 13)),
 ('nissan', (1846, 18)),
 ('dodge', (675, 8)),
 ('plymouth', (607, 7)),
 ('saab', (760, 6)),
 ('volvo', (1408, 11)),
 ('alfa-romero', (376, 3)),
 ('mercedes-benz', (1170, 8)),
 ('jaguar', (614, 3)),
 ('subaru', (1035, 12)),
 ('toyota', (2969, 32)),
 ('honda', (1043, 13)),
 ('isuzu', (168, 2)),
 ('volkswagen', (973, 12)),
 ('peugot', (1098, 11)),
 ('audi', (687, 6)),
 ('bmw', (1111, 8)),
 ('mercury', ('175', 1)),
 ('porsche', (764, 4))]

In [6]:
# Calculando a média de HP dividindo pela contagem total
fabricantes.mapValues(lambda x: int(x[0])/int(x[1])).collect()

[('chevrolet', 62.666666666666664),
 ('mazda', 86.875),
 ('mitsubishi', 104.07692307692308),
 ('nissan', 102.55555555555556),
 ('dodge', 84.375),
 ('plymouth', 86.71428571428571),
 ('saab', 126.66666666666667),
 ('volvo', 128.0),
 ('alfa-romero', 125.33333333333333),
 ('mercedes-benz', 146.25),
 ('jaguar', 204.66666666666666),
 ('subaru', 86.25),
 ('toyota', 92.78125),
 ('honda', 80.23076923076923),
 ('isuzu', 84.0),
 ('volkswagen', 81.08333333333333),
 ('peugot', 99.81818181818181),
 ('audi', 114.5),
 ('bmw', 138.875),
 ('mercury', 175.0),
 ('porsche', 191.0)]

### Acumuladores e Broadcast

In [7]:
# Inicializando variáveis Accumulator
sedanCount = sc.accumulator(0)
hatchbackCount = sc.accumulator(0)

In [8]:
# Inicializando variáveis Broadcast
sedanText = sc.broadcast('sedan')
hatchbackText = sc.broadcast('hatchback')

In [9]:
def splitLines(line):
    
    global sedanCount
    global hatchbackCount
    
    # Usa a variável Broadcast para comparar e configura o accumulator
    if sedanText.value in line:
        sedanCount +=1
    if hatchbackText.value in line:
        hatchbackCount +=1
        
    return line.split(',')

In [11]:
# Map()
splitData = carros.map(splitLines)

In [12]:
# Ação para executar a transformação (lazy evaluation)
splitData.count()
print(sedanCount, hatchbackCount)

92 67


### Partições

In [13]:
fabricantes.getNumPartitions()

2

In [14]:
# Especificando o número de partições
collData = sc.parallelize([3,5,4,7,4], 3)
collData.cache()
collData.count()

5

In [15]:
collData.getNumPartitions()

3

In [16]:
print(sc.defaultParallelism)

8
