## Pares RDD (Pair RDD)
Tipo de estrutura de dados em Spark que armazena chave e valor(es).
Exemplo:

- chave: pessoa
- valor: aportes de investimentos dessa pessoa, contas pagar por essa pessoas, etc.

In [12]:
file = ('C:\\Users\\whitecube.daniel\\Projetos_Daniel\\DistributedComputing\\Datasets\\cars.csv')
carRDD = sc.textFile(file)
print('Imprimindo as 5 primeiras linhas da coleção RDD importada...')
carRDD.take(5)

Imprimindo as 5 primeiras linhas da coleção RDD importada...


['MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE',
 'subaru,gas,std,two,hatchback,fwd,four,69,4900,31,36,5118',
 'chevrolet,gas,std,two,hatchback,fwd,three,48,5100,47,53,5151',
 'mazda,gas,std,two,hatchback,fwd,four,68,5000,30,31,5195',
 'toyota,gas,std,two,hatchback,fwd,four,62,4800,35,39,5348']

#### Exemplo de PAIR (key, value)

In [43]:
print('Quebrando por "," as coluna do RDD, assim como efetuando o mapeamento de determinados campos...\n')
carRDD1 = carRDD.map(lambda x : (x.split(',')[0], x.split(',')[7]))
print('Imprimindo as 5 primeiras linhas da coleção RDD transformada...')
carRDD1.take(5)

Quebrando por "," as coluna do RDD, assim como efetuando o mapeamento de determinados campos...

Imprimindo as 5 primeiras linhas da coleção RDD transformada...


[('MAKE', 'HP'),
 ('subaru', '69'),
 ('chevrolet', '48'),
 ('mazda', '68'),
 ('toyota', '62')]

#### Removendo o cabeçalho (Header)...

In [44]:
header = carRDD1.first()
carRDD2 = carRDD1.filter(lambda x : x != header)
print('Imprimindo as 5 primeiras linhas da coleção RDD transformada...')
carRDD2.take(5)

Imprimindo as 5 primeiras linhas da coleção RDD transformada...


[('subaru', '69'),
 ('chevrolet', '48'),
 ('mazda', '68'),
 ('toyota', '62'),
 ('mitsubishi', '68')]

#### Mapeando os valores para cada uma das linhas da RDD...

In [52]:
#carRDD3 = carRDD2.map(lambda x : (x[0], (x[1], 1))) # mapeando os valores de uma forma mais "elegante"...
carRDD3 = carRDD2.mapValues(lambda x : (x, 1))
carRDD3.take(5)

[('subaru', ('69', 1)),
 ('chevrolet', ('48', 1)),
 ('mazda', ('68', 1)),
 ('toyota', ('62', 1)),
 ('mitsubishi', ('68', 1))]

#### Fazendo uma redução por CHAVE...

In [56]:
carRDD4 = carRDD3.reduceByKey( lambda value_1, value_2 : ( int(value_1[0]) + int(value_2[0]), int(value_1[1]) + int(value_2[1]) ) )
carRDD4.take(10)

[('chevrolet', (188, 3)),
 ('mazda', (1390, 16)),
 ('mitsubishi', (1353, 13)),
 ('nissan', (1846, 18)),
 ('dodge', (675, 8)),
 ('plymouth', (607, 7)),
 ('saab', (760, 6)),
 ('volvo', (1408, 11)),
 ('alfa-romero', (376, 3)),
 ('mercedes-benz', (1170, 8))]

#### Finalmente fazendo a média de HP por marca de carro...

In [71]:
carRDD4.mapValues( lambda x : round ( ( float(x[0])/float(x[1]) ), 2 ) ).collect()

[('chevrolet', 62.67),
 ('mazda', 86.88),
 ('mitsubishi', 104.08),
 ('nissan', 102.56),
 ('dodge', 84.38),
 ('plymouth', 86.71),
 ('saab', 126.67),
 ('volvo', 128.0),
 ('alfa-romero', 125.33),
 ('mercedes-benz', 146.25),
 ('jaguar', 204.67),
 ('subaru', 86.25),
 ('toyota', 92.78),
 ('honda', 80.23),
 ('isuzu', 84.0),
 ('volkswagen', 81.08),
 ('peugot', 99.82),
 ('audi', 114.5),
 ('bmw', 138.88),
 ('mercury', 175.0),
 ('porsche', 191.0)]

#### Acumulators Broadcast

Seguindo o padrão do Apache, toda e qualquer tarefa é enviada para todos os nodos do cluster. Esses então processam as tarefas considerando as variáveis como locais a cada nodo, ou seja, agem de forma independente.

Quando se necessita que uma variável criada seja manipulada de forma ÚNICA em todo o cluster, usa-se:

- ACUMULATORS -> todos os nodos do cluster recebem o(s) valor(s) da variável criada, porém pode ser atualizada em cada node do cluster (ReadAndWrite).

- BROADCAST -> todos os nodos do cluster recebem o(s) valor(s) da variável criada, que para manter a integridade, é read-only.

#### Definindo os valores para os Aumuladores

In [77]:
acc1 = sc.accumulator(0)
acc2 = sc.accumulator(0)

#### Definindo os valores para os Broadcasts

In [78]:
broad1 = sc.broadcast("sedan")
broad2 = sc.broadcast("hatchback")

#### Função que conta para cada incidência das palavras declaradas no BROACAST - "sedan" e "hatchback", adiciona o valor 1 nas respectivas variáveis de ACUMULADOR

In [140]:
def split(inputvalue):
    # Para usar Acumuladores e Broadcast, é necessário declarar variáveis GLOBAIS, pois todos os nodos terão de "enchergar".
    global acc1
    global acc2
    
    if broad1.value in inputvalue: # para cada vez que a o valor de broad1.value for encontrado, conta + 1
        acc1 +=1
    if broad2.value in inputvalue: # para cada vez que a o valor de broad2.value for encontrado, conta + 1
        acc2 +=1

    return inputvalue.split(',')

In [146]:
carRDD5 = carRDD.map(split)

In [145]:
carRDD5.count()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 107.0 failed 1 times, most recent failure: Lost task 0.0 in stage 107.0 (TID 151, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 177, in main
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 172, in process
  File "C:\Spark\python\pyspark\rdd.py", line 2423, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\Spark\python\pyspark\rdd.py", line 2423, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\Spark\python\pyspark\rdd.py", line 2423, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\Spark\python\pyspark\rdd.py", line 346, in func
    return f(iterator)
  File "C:\Spark\python\pyspark\rdd.py", line 1041, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "C:\Spark\python\pyspark\rdd.py", line 1041, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "<ipython-input-140-8200ea903ef9>", line 9, in split
TypeError: unsupported operand type(s) for +: 'Accumulator' and 'int'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:458)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor75.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 177, in main
  File "C:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 172, in process
  File "C:\Spark\python\pyspark\rdd.py", line 2423, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\Spark\python\pyspark\rdd.py", line 2423, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\Spark\python\pyspark\rdd.py", line 2423, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\Spark\python\pyspark\rdd.py", line 346, in func
    return f(iterator)
  File "C:\Spark\python\pyspark\rdd.py", line 1041, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "C:\Spark\python\pyspark\rdd.py", line 1041, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "<ipython-input-140-8200ea903ef9>", line 9, in split
TypeError: unsupported operand type(s) for +: 'Accumulator' and 'int'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more


In [147]:
print('Para o carro de marca "', broad1.value, '" houve uma contagem de', acc1,'automóveis.')
print('Para o carro de marca "', broad2.value, '" houve uma contagem de', acc2,'automóveis.')

Para o carro de marca " sedan " houve uma contagem de 0 automóveis.
Para o carro de marca " hatchback " houve uma contagem de 0 automóveis.
