In [0]:
#Inicializar Sparksession y SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkContext

#Crear sesion de Spark
SpSession = SparkSession.builder.appName("DBA - Spark Acumulators").getOrCreate()
# Se pueden configurar mediante metodo config la memoria entre otras características

# Obtener el Spark context del Spark Session
SpContext = SpSession.sparkContext

## Cargar Datos

In [0]:
# Cargar desde una coleccion
collData = SpContext.parallelize([4,3,8,5,8])
inputPath = "/FileStore/tables/auto_data.csv"
autoData = SpContext.textFile(inputPath)
autoData.cache()

Out[2]: /FileStore/tables/auto_data.csv MapPartitionsRDD[2] at textFile at NativeMethodAccessorImpl.java:0

In [0]:
autoData.take(5)

Out[3]: ['MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE',
 'subaru,gas,std,two,hatchback,fwd,four,69,4900,31,36,5118',
 'chevrolet,gas,std,two,hatchback,fwd,three,48,5100,47,53,5151',
 'mazda,gas,std,two,hatchback,fwd,four,68,5000,30,31,5195',
 'toyota,gas,std,two,hatchback,fwd,four,62,4800,35,39,5348']

## Trabajar con RDD de clave / valor

In [0]:
# crear un KV RDD de auto Brand y Horsepower
cylData = autoData.map(lambda x: (x.split(",")[0], x.split(",")[7]))
print("cylData.take")
for i in cylData.take(5):
    print(i)

cylData.take
('MAKE', 'HP')
('subaru', '69')
('chevrolet', '48')
('mazda', '68')
('toyota', '62')


In [0]:
cylData.keys().collect()

Out[5]: ['MAKE',
 'subaru',
 'chevrolet',
 'mazda',
 'toyota',
 'mitsubishi',
 'honda',
 'nissan',
 'dodge',
 'plymouth',
 'mazda',
 'mitsubishi',
 'dodge',
 'plymouth',
 'chevrolet',
 'toyota',
 'dodge',
 'honda',
 'toyota',
 'honda',
 'chevrolet',
 'nissan',
 'mitsubishi',
 'dodge',
 'plymouth',
 'mazda',
 'isuzu',
 'mazda',
 'nissan',
 'honda',
 'toyota',
 'toyota',
 'mitsubishi',
 'subaru',
 'nissan',
 'subaru',
 'honda',
 'toyota',
 'honda',
 'honda',
 'nissan',
 'nissan',
 'mazda',
 'subaru',
 'nissan',
 'subaru',
 'dodge',
 'plymouth',
 'mitsubishi',
 'toyota',
 'subaru',
 'volkswagen',
 'toyota',
 'nissan',
 'honda',
 'toyota',
 'toyota',
 'dodge',
 'plymouth',
 'volkswagen',
 'volkswagen',
 'nissan',
 'subaru',
 'toyota',
 'mitsubishi',
 'volkswagen',
 'toyota',
 'nissan',
 'toyota',
 'toyota',
 'mazda',
 'volkswagen',
 'mitsubishi',
 'toyota',
 'honda',
 'mazda',
 'dodge',
 'plymouth',
 'toyota',
 'nissan',
 'honda',
 'subaru',
 'toyota',
 'mitsubishi',
 'mitsubishi',
 'toyot

In [0]:
# Eliminar fila de encabezado
header = cylData.first()
cylHPData = cylData.filter(lambda line: line != header)

In [0]:
# Encontrar HP promedio por marca
# Agregar un recuento de 1 a cada registro y luego reducir para encontrar total de HP y recuentos
addOne = cylHPData.mapValues(lambda x: (x,1))
addOne.collect()

Out[16]: [('subaru', ('69', 1)),
 ('chevrolet', ('48', 1)),
 ('mazda', ('68', 1)),
 ('toyota', ('62', 1)),
 ('mitsubishi', ('68', 1)),
 ('honda', ('60', 1)),
 ('nissan', ('69', 1)),
 ('dodge', ('68', 1)),
 ('plymouth', ('68', 1)),
 ('mazda', ('68', 1)),
 ('mitsubishi', ('68', 1)),
 ('dodge', ('68', 1)),
 ('plymouth', ('68', 1)),
 ('chevrolet', ('70', 1)),
 ('toyota', ('62', 1)),
 ('dodge', ('68', 1)),
 ('honda', ('58', 1)),
 ('toyota', ('62', 1)),
 ('honda', ('76', 1)),
 ('chevrolet', ('70', 1)),
 ('nissan', ('69', 1)),
 ('mitsubishi', ('68', 1)),
 ('dodge', ('68', 1)),
 ('plymouth', ('68', 1)),
 ('mazda', ('68', 1)),
 ('isuzu', ('78', 1)),
 ('mazda', ('68', 1)),
 ('nissan', ('69', 1)),
 ('honda', ('76', 1)),
 ('toyota', ('62', 1)),
 ('toyota', ('70', 1)),
 ('mitsubishi', ('88', 1)),
 ('subaru', ('73', 1)),
 ('nissan', ('55', 1)),
 ('subaru', ('82', 1)),
 ('honda', ('76', 1)),
 ('toyota', ('70', 1)),
 ('honda', ('76', 1)),
 ('honda', ('76', 1)),
 ('nissan', ('69', 1)),
 ('nissan', ('69

In [0]:
brandValues = addOne.reduceByKey(lambda x, y: (int(x[0]) + int(y[0]), x[1] + y[1]))
brandValues.collect()

Out[17]: [('chevrolet', (188, 3)),
 ('mazda', (1390, 16)),
 ('mitsubishi', (1353, 13)),
 ('nissan', (1846, 18)),
 ('dodge', (675, 8)),
 ('plymouth', (607, 7)),
 ('saab', (760, 6)),
 ('volvo', (1408, 11)),
 ('alfa-romero', (376, 3)),
 ('mercedes-benz', (1170, 8)),
 ('jaguar', (614, 3)),
 ('subaru', (1035, 12)),
 ('toyota', (2969, 32)),
 ('honda', (1043, 13)),
 ('isuzu', (168, 2)),
 ('volkswagen', (973, 12)),
 ('peugot', (1098, 11)),
 ('audi', (687, 6)),
 ('bmw', (1111, 8)),
 ('mercury', ('175', 1)),
 ('porsche', (764, 4))]

In [0]:
# encontrar promedio dividiendo el total de HP entre el total de conteo
promedio = brandValues.mapValues(lambda x: int(x[0])/int(x[1])).collect()
print("promedio", promedio)

promedio [('chevrolet', 62.666666666666664), ('mazda', 86.875), ('mitsubishi', 104.07692307692308), ('nissan', 102.55555555555556), ('dodge', 84.375), ('plymouth', 86.71428571428571), ('saab', 126.66666666666667), ('volvo', 128.0), ('alfa-romero', 125.33333333333333), ('mercedes-benz', 146.25), ('jaguar', 204.66666666666666), ('subaru', 86.25), ('toyota', 92.78125), ('honda', 80.23076923076923), ('isuzu', 84.0), ('volkswagen', 81.08333333333333), ('peugot', 99.81818181818181), ('audi', 114.5), ('bmw', 138.875), ('mercury', 175.0), ('porsche', 191.0)]


##Acumuladores y variables de difusión (broadcast)
'''
Las variables del acumulador se utilizan para agregar la información a través de operaciones asociativas y conmutativas. Por ejemplo, puede usar un acumulador para una operación de suma o contadores (en MapReduce).

Ua variable de acumulador tiene un atributo llamado valor que es similar a lo que tiene una variable de difusión. Almacena los datos y se utiliza para devolver el valor del acumulador, pero solo se puede usar en un programa de controlador.
'''

In [0]:
# Inicializar acumulador
sedanCount = SpContext.accumulator(0)
hatchbackCount = SpContext.accumulator(0)

In [0]:
print("sedanCount: ", sedanCount)
print("hatchbackCount: ", hatchbackCount)

sedanCount:  0
hatchbackCount:  0


'''
Las variables de difusión se utilzan para guardar la copia de datos en todos los nodos. Esta variable se almacena en caché en todas las máquinas y no se envía en máquinas con tareas. El siguiente bloque de código tiene los detalles de una clase Broadcast para PySpark.

Una variable de difusión tiene un atributo, llamado valor, que almacena los datos y se utiliza para devolver un valor emitido.
'''

In [0]:
# Establecer variable de difusión (broadcast)
sedanText = SpContext.broadcast("sedan")
hatchbackText = SpContext.broadcast("hatchback")

print("sedanText", sedanText)
print("sedanText", sedanText.value)
print("hatchbackText", hatchbackText)
print("hatchbackText", hatchbackText.value)

sedanText <pyspark.broadcast.Broadcast object at 0x7f8941503ee0>
sedanText sedan
hatchbackText <pyspark.broadcast.Broadcast object at 0x7f8941503910>
hatchbackText hatchback


In [0]:
def splitLines(line):
    global sedanCount
    global hatchbackCount
    
    # Usar la variable de difusión para hacer una comparación y configurar el acumulador
    if sedanText.value in line:
        sedanCount += 1
    if hatchbackText.value in line:
        hatchbackCount += 1
    
    return line.split(",")

In [0]:
# Mapear el valor 
splitData = autoData.map(splitLines)
splitData.take(3) 

Out[41]: [['MAKE',
  'FUELTYPE',
  'ASPIRE',
  'DOORS',
  'BODY',
  'DRIVE',
  'CYLINDERS',
  'HP',
  'RPM',
  'MPG-CITY',
  'MPG-HWY',
  'PRICE'],
 ['subaru',
  'gas',
  'std',
  'two',
  'hatchback',
  'fwd',
  'four',
  '69',
  '4900',
  '31',
  '36',
  '5118'],
 ['chevrolet',
  'gas',
  'std',
  'two',
  'hatchback',
  'fwd',
  'three',
  '48',
  '5100',
  '47',
  '53',
  '5151']]

In [0]:
# Haz que ejecute el mapa (lazy execution): no ejecuta sino hasta que lo pida explicitamente
print("splitData.count()", splitData.count())
print("sedanCount", sedanCount)
print("hatchbackCount", hatchbackCount)

splitData.count() 198
sedanCount 92
hatchbackCount 69


## Particiones

In [0]:
collData.getNumPartitions()

Out[43]: 8

In [0]:
# Especificar el no. particiones
collData = SpContext.parallelize([3,5,4,7,4],4)
collData.cache()
conteo = collData.count()
print("conteo: ", conteo)
particiones = collData.getNumPartitions()
print("particiones:", particiones)

conteo:  5
particiones: 4
