# <font color='blue'>Data Science Academy Big Data Real-Time Analytics com Python e Spark</font>

# <font color='blue'>Capítulo 7</font>

****** Este Jupyter Notebook foi atualizado para a versão 3.6.1. da Linguagem Python em 13/06/2017 ******

Acesse http://localhost:4040 sempre que quiser acompanhar a execução dos jobs

## Pares RDD (Pair RDD)

Tipo especial de RDD que armazena pares chave-valor. Útil quando é necessáirio armazenar dados que possuem uma chave e diversos valores (por exemplo, todas as transações de um cliente, geradas em tempo real).

#### mapValues()
#### countByKey()
#### groupByKey()
#### reduceByKey()
#### aggregateByKey()

In [1]:
# Importando arquivo csv e criando um RDD
carros = sc.textFile("carros.csv")

In [2]:
# Criando uma Pair RDD
carrosPairRDD = carros.map(lambda x: (x.split(",")[0], x.split(",")[7]))
carrosPairRDD.take(5)

[('MAKE', 'HP'),
 ('subaru', '69'),
 ('chevrolet', '48'),
 ('mazda', '68'),
 ('toyota', '62')]

In [3]:
# Removendo o cabeçalho
header = carrosPairRDD.first()
carrosPairRDD2 = carrosPairRDD.filter(lambda line: line != header)

In [4]:
# Encontra a média de HP por marca de carro e adiciona 1 a cada contagem
addOne = carrosPairRDD2.mapValues(lambda x: (x, 1))
addOne.collect()

[('subaru', ('69', 1)),
 ('chevrolet', ('48', 1)),
 ('mazda', ('68', 1)),
 ('toyota', ('62', 1)),
 ('mitsubishi', ('68', 1)),
 ('honda', ('60', 1)),
 ('nissan', ('69', 1)),
 ('dodge', ('68', 1)),
 ('plymouth', ('68', 1)),
 ('mazda', ('68', 1)),
 ('mitsubishi', ('68', 1)),
 ('dodge', ('68', 1)),
 ('plymouth', ('68', 1)),
 ('chevrolet', ('70', 1)),
 ('toyota', ('62', 1)),
 ('dodge', ('68', 1)),
 ('honda', ('58', 1)),
 ('toyota', ('62', 1)),
 ('honda', ('76', 1)),
 ('chevrolet', ('70', 1)),
 ('nissan', ('69', 1)),
 ('mitsubishi', ('68', 1)),
 ('dodge', ('68', 1)),
 ('plymouth', ('68', 1)),
 ('mazda', ('68', 1)),
 ('isuzu', ('78', 1)),
 ('mazda', ('68', 1)),
 ('nissan', ('69', 1)),
 ('honda', ('76', 1)),
 ('toyota', ('62', 1)),
 ('toyota', ('70', 1)),
 ('mitsubishi', ('88', 1)),
 ('subaru', ('73', 1)),
 ('nissan', ('55', 1)),
 ('subaru', ('82', 1)),
 ('honda', ('76', 1)),
 ('toyota', ('70', 1)),
 ('honda', ('76', 1)),
 ('honda', ('76', 1)),
 ('nissan', ('69', 1)),
 ('nissan', ('69', 1)),
 

In [5]:
# Aplica redução por key (reduceByKey) e conta a quantidade de fabricantes de carro, a média de HP e o nome do fabricante
fabricantes =  addOne.reduceByKey(lambda x, y: (int(x[0]) + int(y[0]), x[1] + y[1])) 
fabricantes.collect()

[('volvo', (1408, 11)),
 ('saab', (760, 6)),
 ('mercedes-benz', (1170, 8)),
 ('alfa-romero', (376, 3)),
 ('plymouth', (607, 7)),
 ('jaguar', (614, 3)),
 ('nissan', (1846, 18)),
 ('dodge', (675, 8)),
 ('mitsubishi', (1353, 13)),
 ('mazda', (1390, 16)),
 ('chevrolet', (188, 3)),
 ('honda', (1043, 13)),
 ('subaru', (1035, 12)),
 ('porsche', (764, 4)),
 ('mercury', ('175', 1)),
 ('toyota', (2969, 32)),
 ('peugot', (1098, 11)),
 ('isuzu', (168, 2)),
 ('bmw', (1111, 8)),
 ('audi', (687, 6)),
 ('volkswagen', (973, 12))]

In [6]:
# Calculando a média de HP dividindo pela contagem total
fabricantes.mapValues(lambda x: int(x[0])/int(x[1])).collect()

[('volvo', 128.0),
 ('saab', 126.66666666666667),
 ('mercedes-benz', 146.25),
 ('alfa-romero', 125.33333333333333),
 ('plymouth', 86.71428571428571),
 ('jaguar', 204.66666666666666),
 ('nissan', 102.55555555555556),
 ('dodge', 84.375),
 ('mitsubishi', 104.07692307692308),
 ('mazda', 86.875),
 ('chevrolet', 62.666666666666664),
 ('honda', 80.23076923076923),
 ('subaru', 86.25),
 ('porsche', 191.0),
 ('mercury', 175.0),
 ('toyota', 92.78125),
 ('peugot', 99.81818181818181),
 ('isuzu', 84.0),
 ('bmw', 138.875),
 ('audi', 114.5),
 ('volkswagen', 81.08333333333333)]

## Accumulators e Broadcast

O Spark faz uma cópia do código que você escreveu para processar os dados e executa essas cópias, uma por node do cluster. Qualquer variável criada no código é local ao node. O Spark gera cópias dessas variáveis locais, uma em cada node, que agem de forma independente. Mas e se precisamos que a mesma variável seja manipulada de forma única através de todo o cluster? Usamos Acumuladores e Broadcast.

Variável Broadcast - read-only, é compartilhada em todo o cluster.

Variável Accumulator - é compartilhada em todo o cluster, mas pode ser atualizada em cada node do cluster.

In [7]:
# Inicializando variáveis Accumulator
sedanCount = sc.accumulator(0)
hatchbackCount = sc.accumulator(0)

In [8]:
# Inicializando variáveis Broadcast
sedanText = sc.broadcast("sedan")
hatchbackText = sc.broadcast("hatchback")

In [9]:
def splitLines(line) :

    global sedanCount
    global hatchbackCount

    # Usa a variável Broadcast para comparar e configura o accumulator
    if sedanText.value in line:
        sedanCount +=1
    if hatchbackText.value in line:
        hatchbackCount +=1
        
    return line.split(",")

In [10]:
# Map()
splitData = carros.map(splitLines)

In [11]:
# Ação para executar a transformação (lazy evaluation)
splitData.count()
print(sedanCount, hatchbackCount)

92 67


## Partições

Sempre que criamos RDD's, esses objetos são divididos em partições e essas partições são distirbuídas através dos nodes do cluster. Por default, os RDD's são sempre particionados. Essas partições precisam ser configuradas quando se trabalha com grandes clusters.

In [12]:
fabricantes.getNumPartitions()

2

In [13]:
# Especificando o Número de Partições
collData = sc.parallelize([3,5,4,7,4], 3)
collData.cache()
collData.count()

5

In [14]:
collData.getNumPartitions()

3

In [15]:
print (sc.defaultParallelism)

8


# Fim

### Obrigado - Data Science Academy - <a href=http://facebook.com/dsacademy>facebook.com/dsacademybr</a>