# Computación Avanzada y sus Aplicaciones a Ingeniería

### Máster Universitario en Ingeniería Informática

# Práctica 1 - Parte I - Introducción a la programación en Apache Spark

En esta práctica introducimos las operaciones básicas para trabajar con los RDDs de Spark. Este primer notebook no es más que una guía de todas las operaciones que puedes realizar en Spark. Sigue detenidamente todos los bloques y prueba a cambiar los valores establecidos para comprobar su funcionamiento.

Ten en cuenta que una vez tengas en marcha Spark, podrás visualizar la evolución de cada trabajo de Spark en  <http://localhost:4040>

## **Uso básico de los notebooks y su integración con Python**

### Utilización de un Notebook

Un notebook está compuesto por una serie de celdas. Estas celdas pueden contener texto explicativo o código, pero nunca se mezclan ambas en la misma celda. Cuando ejecutamos una celda de texto, lo que hemos escrito con el lenguaje de markdown se renderiza como texto, imágenes y links (como si fuera HTML). El texto que estás leyendo ahora mismo es parte de una celda de este tipo. Las celdas con código Python te permiten ejecutar comandos de Python como si estuvieras en la consola de Python. Coloca el cursos dentro de la celda de más abajo y presiona "Shift + Enter" para ejecutar el código y avanzar a la siguiente celda. También puedes utilizar "Ctrl + Enter" para ejecutar el código y mantenerte en la misma celda. Estos comandos funcionan tanto en celdas de código como en celdas de texto.

In [1]:
# Esto es una celda ed Python. Puedes ejecutar código Python en estas celdas
print('La suma de 1 y 1 es {0}'.format(1+1))

La suma de 1 y 1 es 2


In [2]:
# Esta es otra celda Python, utiliza una variable x y un if
x = 28
if x > 18:
    print('x es mayor que 18')

x es mayor que 18


### Estado de un Notebook

Cuando trabajas con un notebook es importante ejecutar todas las celdas con código. El notebook tiene estado, lo que quiere decir que las variables y sus valores se mantienen hasta el que el kernel del notebook se reinicia. Si no ejecutas todas las celdas de código a lo largo del notebook, las variables pueden no estar correctamente inicializadas y pueden fallar celdas de código posteriores. También necesitarás reejecutar cualquier celda que hayas modificado para que los cambios estén disponibles en otras celdas

In [3]:
# Esta celda utiliza la variable x que hemos definido en una celda anterior
# Si no ejecutamos la celda anaterior este código fallará
print(x * 2)

56


# Introducción a Spark

En caso de estar utilizando pySpark, **NO** es necesario inicializar el `SparkContext`, es decir, **no** ejecutar la siguiente celda

In [1]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("My App")
sc = SparkContext(conf = conf)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/20 10:45:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/20 10:45:37 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Creación de un RDD
Podemos crear un RDD de dos formas

1. Cargando un conjunto de datos almacenado en un medio externo: `sc.textFile(fichero)`
2. Distribuyendo una colección de objetos existente: `sc.parallelize(colección_python)`

In [None]:
# Creación de un RDD desde un fichero
quijoteRDD = sc.textFile("./datos/pg2000.txt")

# Creación de un RDD desde una colección
datos = [1, 2, 3, 4, 5]
datoRDD = sc.parallelize(datos)

# Operaciones con RDDs

Podemos realizar dos tipos de operaciones sobre los RDDs
1. Transformaciones: Crean un nuevo RDD a partir de otro - **EVALUACIÓN VAGA (LAZY)** - hasta que no se ejecuta una acción no se realiza la transformación
2. Acciones: Utilizan el RDD para lograr un resultado que es recibido por el driver (o escriben el RDD a disco)

## Transformaciones sobre RDDs

### Transformaciones básicas


Transformación | Descripción
------------- | -------------
*map(func)* | Devuelve un nuevo RDD formado aplicando a cada elemento del RDD original la función func
*filter(func)* | Devuelve un nuevo RDD formado por los elementos para los cuales el aplicarles la función func devuelve true
*distinct()* | Devuelve un nuevo RDD que contiene los elementos distintos dentro del RDD original
*flatMap(func)* | Similar al map, pero cada elemento de entrada puede ser mapeado a 0 o más elementos de salida (por tanto, func devuelve una secuencia en vez de un único elemento)



### `map(func)`

In [None]:
rdd = sc.parallelize([1, 2, 3, 4])
rdd.map(lambda x: x * 2).collect() # Utilizamos collect() para ver el resultado en el driver

### `filter(func)`

In [None]:
rdd.filter(lambda x: x != 1).collect()

In [None]:
rdd.filter(lambda x: x % 2 == 0).collect()

### `distinct()`

In [2]:
rdd = sc.parallelize([1, 4, 2, 2, 3])
rdd.distinct().collect()

                                                                                

[4, 1, 2, 3]

### `map(func)` vs `flatMap(func)`
Es importante diferenciar la función `map`de `flatMap`. `map` devuelve tantos elementos como tiene el RDD original, mientras que en `flatMap` la función debe devolver una lista de elementos (que puede ser vacía o tener más de un elemento) y concatena todas las listas en un único RDD de elementos.

In [None]:
rdd = sc.parallelize([1, 2, 3])
print("map: " + str(rdd.map(lambda x: [x, x+5]).collect()))
print("flatMap: " + str(rdd.flatMap(lambda x: [x, x+5]).collect()))

In [None]:
lines = sc.parallelize(["hello world", "hi", "dime tu nombre", "hasta luego"])

wordsMap = lines.map(lambda line: line.split(" "))
wordsFlatMap = lines.flatMap(lambda line: line.split(" "))

print("map: " + str(wordsMap.collect()))
print("flatMap: " + str(wordsFlatMap.collect()))

### Transformaciones con pseudo-conjuntos
Transformación | Descripción
------------- | -------------
*distinct()* | Devuelve el RDD sin elementos repetidos – ¡Cuidado! Requiere shuffle (enviar datos por red)
*union(rdd)* | Devuelve la unión de los elementos en los dos RDDs  (se mantienen los duplicados)
*intersection(rdd)* | Devuelve la instersección de los elementos en los dos RDDs (elimina los duplicados) – ¡Cuidado! Requiere shuffle (datos por red)
*subtract(rdd)* | Devuelve los elementos presentes en el primer RDD y no en el segundo – ¡Cuidado! También requiere de shuffle
*cartesian(rdd)* | Devuelve un RDD con todos los posibles pares entre elementos de ambos RDDs

In [None]:
rdd1 = sc.parallelize(["agua", "vino", "cerveza", "agua", "agua", "vino"])
rdd2 = sc.parallelize(["cerveza", "cerveza", "agua", "agua", "vino", "coca-cola", "naranjada"])

print("distinct: " + str(rdd1.distinct().collect()))
print("union: " + str(rdd1.union(rdd2).collect()))
print("intersection: " + str(rdd1.intersection(rdd2).collect()))
print("substract: " + str(rdd1.subtract(rdd2).collect()))
print("cartesian: " + str(rdd1.cartesian(rdd2).collect()))

## Acciones sobre RDDs

### Acciones básicas


Acción | Descripción
------------- | -------------
reduce(func) | Agrega los elementos del RDD usando la función func. func toma dos argumentos y devuelve uno, y es conmutativa y asociativa de tal forma que puede calcularse correctamente en paralelo
*take(n)* | Devuelve una lista con los n primeros elementos del RDD
*collect()* | Devuelve todos los elementos del RDD como una lista. **CUIDADO: Hay que asegurarse de que vayan a caber en el driver**
*takeOrdered(n, key=func)* | Devuelve n elementos ordenados de manera ascendente o en el orden especificado por la función de orden opcional func
*foreach(func)* | Aplica la función func a cada elemento del RDD. No devuelve nada, puede valer para realizar inserciones a BBDD por ejemplo
*count()* | Cuenta el número de elementos en el RDD



In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8])

prod = rdd.reduce(lambda a, b: a * b)
print("Resultado del producto: " + str(prod))  # función conmutativa y asociativa!!!
print("Dos primeros valores con take(2): " + str( rdd.take(2) ))
print("Todo el RDD con collect(): " + str( rdd.collect() ))

rdd = sc.parallelize([5, 3, 1, 2]) 
print("Los tres elementos más grandes con takeOrdered(3, func): " + str( rdd.takeOrdered(3, lambda s: -1 * s) ))

## Transformaciones clave-valor

### Transformaciones básicas


Transformación | Descripción
------------- | -------------
reduceByKey(func)  | Devuelve un nuevo RDD de tuplas (K, V) donde los valores para cada clave K son agregados usando una función de reducción func, cuyo tipo debe ser (V, V) à V
*sortByKey()* | Devuelve un nuevo RDD de tuplas (K, V) ordenadas por clave en orden ascendente
*groupByKey() * | Devuelve un nuevo RDD de tuplas (K, iterable(V)) **¡Cuidado! Puede ser muy costoso – datos por red**



In [None]:
rdd = sc.parallelize([(1, 2), (3, 4), (3, 6)]) 
print("reduceByKey: " + str( rdd.reduceByKey(lambda a, b: a + b).collect() ))

rdd2 = sc.parallelize([(1,'a'), (2,'c'), (1,'b')])
print("reduceByKey: " + str( rdd2.sortByKey().collect() ))
    
rdd2 = sc.parallelize([(1,'a'), (2,'c'), (1,'b')]) 
print("reduceByKey: " + str( rdd2.groupByKey().collect() ) )

### Transformaciones Join tipo SQL

Transformación | Descripción
------------- | -------------
join(rdd)  | Inner join entre dos RDDs
*leftOuterJoin(rdd)* | Realiza un join entre los dos RDDs donde la clave debe estar presente en el segundo de ellos
*rightOuterJoin(rdd)* | Realiza un join entre los dos RDDs donde la clave debe estar presente en el primero de ellos
*fullOuterJoin(rdd)* | Realiza un join entre los dos RDDs donde la clave debe estar presente alguno de ellos

### `join`

In [4]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
sorted(x.join(y).collect())

[('a', (1, 2)), ('a', (1, 3))]

### `leftOuterJoin`

In [None]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
sorted(x.leftOuterJoin(y).collect())

### `rightOuterJoin`

In [None]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
sorted(x.rightOuterJoin(y).collect())

### `fullOuterJoin`

In [3]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("c", 8)])
sorted(x.fullOuterJoin(y).collect())

[('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]

## Acciones clave-valor

### Acciones básicas


Acción | Descripción
------------- | -------------
countByKey() | Cuenta el número de elementos para cada clave
*collectAsMap()* | Recolecta el RDD como un Map para facilitar las búsquedas
*lookup(key)* | Devuelve el valor asociado con la clave dada




In [None]:
rdd = sc.parallelize([(1, 2), (3, 4), (3, 6)])

print("countByKey: " + str( rdd.countByKey() ))
print("collectAsMap: " + str( rdd.collectAsMap() ))
print("lookup(3): " + str( rdd.lookup(3) ))

# Caché de RDDs
Si se va a reusar un RDD es conveniente cachearlo para que no se recalcule cada vez

In [None]:
quijoteRDD = sc.textFile("./datos/pg2000.txt")
palabrasQuijoteRDD = quijoteRDD.flatMap(lambda line: line.split(' ')).cache()
print("Cabeza aparece " + str( quijoteRDD.filter(lambda line: "cabeza" in line).count() ) + " veces")
print("Lanza aparece " + str( quijoteRDD.filter(lambda line: "Lanza" in line).count() ) + " veces")

# Aspectos avanzados

## Variables broadcast
Permiten enviar eficientemente valores de gran tamaño a los workers (solo lectura)

In [None]:
tablaLookUp = {1: "a", 2: "b", 3: "c", 4: "d"} # suponer que es muy grande

tablaLookUp[1]

In [None]:
tablaLookUp = {1: "a", 2: "b", 3: "c", 4: "d"} # suponer que es muy grande
rdd = sc.parallelize([1, 2, 3, 4])

sinBroadcast = rdd.map(lambda v: tablaLookUp[v]).collect()

tablaLookUpBroadcast = sc.broadcast(tablaLookUp) # creamos la variable tipo broadcast que se distribuye a los workers
conBroadcast = rdd.map(lambda v: tablaLookUpBroadcast.value[v]).collect() # con .value accedemos al valor de la variable broadcast

print("Sin broadcast: " + str(sinBroadcast))
print("Con broadcast: " + str(conBroadcast))

## Acumuladores
Agregan valores del os executors en el driver. Solo el driver puede leer las variables, para los workers son solo de escritura

In [None]:
accum = sc.accumulator(0)
rdd = sc.parallelize([1, 2, 3, 4])

def f(x):
    global accum 
    accum += x
    
rdd.foreach(f)
accum.value

In [None]:
quijoteRDD = sc.textFile("./datos/pg2000.txt")
#Creamos un Accumulator[Int] inicializado a 0
blankLines = sc.accumulator(0)

def extraePalabrasBlankLines(line):
    global blankLines # Hacemos la variable global accesible
    if (line == ""):
        blankLines += 1
    return line.split(" ")
    
palabrasQuijoteRDD = quijoteRDD.flatMap(extraePalabrasBlankLines)
# Provocamos que se ejecute la transformación
palabrasQuijoteRDD.count()
print("Líneas en blancos: %d" % blankLines.value)

## Trabajar con datos por particiones
Trabajamos con todos los datos de una partición a la vez.
En vez de aplicar una función por elemento se aplica una función para el iterador de elementos de la partición.
Permite evitar rehacer trabajos de configuración con cada elemento o trabajar con todos los elementos a la vez.

Transformación / Acción | Descripción
------------- | -------------
*mapPartitions(func)* | Aplica la función func a cada partición del RDD. La función func recibe un iterador de elementos y devuelve otro iterador con elementos que pueden ser de diferente tipo
*mapPartitionsWithIndex(func)* | Aplica la función func a cada partición del RDD. La función func recibe una tupla (entero, iterador) donde el entero representa el índice de la partición y el iterador contiene todos los elementos de la partición.
*foreachPartition(func)* | Aplica la función func a cada partición del RDD. No devuelve nada. Puede usarse para realizar inserciones en una BBDD por ejemplo. func recibe un iterador de elementos y no devuelve nada.


### Media sin `mapPartitions`

In [None]:
def combineCtrs(c1, c2):
    return (c1[0] + c2[0], c1[1] + c2[1])

def basicAvg(nums):
    """Compute the average"""
    sumCount = nums.map(lambda num: (num, 1)).reduce(combineCtrs)
    return sumCount[0] / float(sumCount[1])

a = sc.parallelize([1, 2, 3, 4, 5, 6, 7])
basicAvg(a)

### Media con `mapPartitions`

In [None]:
def partitionCtr(nums):
    """Compute sumCounter for partition"""
    sumCount = [0, 0]
    for num in nums:
        sumCount[0] += num
        sumCount[1] += 1
    return [sumCount]

def fastAvg(nums):
    """Compute the avg"""
    sumCount = nums.mapPartitions(partitionCtr).reduce(combineCtrs)
    return sumCount[0] / float(sumCount[1])

a = sc.parallelize([1, 2, 3, 4, 5, 6, 7])
fastAvg(a)

### `mapPartitionsWithIndex`

In [None]:
parallel = sc.parallelize(range(1, 10), 3)
def show(index, iterator): 
    return ['index: ' + str(index) + " values: " + str(list(iterator))] # debemos devolver una lista ya que requiere un iterador

parallel.mapPartitionsWithIndex(show).collect()  


## Operaciones con RDD numéricos



### Método `stats()`
Método `stats()` devuelve un `StatsCounter` con todas las estadísticas calculadas mediante una única pasada por todo el RDD

In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7])
rdd.stats() # devuelve un StatsCounter

### Métodos propios sobre el RDD

Método (acción) | Descripción
------------- | -------------
*Método* | Descripción
*count()* | Número de elementos en el RDD
*mean()* | Media de los elementos en el RDD
*sum()* | Suma total de los elementos en el RDD
*max()* | Máximo valor
*min()* | Mínimo valor
*variance()* | Varianza de los elementos
*sampleVariance()* | Variance de los elementos calculada para una muestra
*stdev()* | Desviación estándar de los elementos
*sampleStdev()* | Desviación estándar para una muestra



In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7])

# ESTA FORMA ES MUCHO MENOS ÓPTIMA QUE STATS() DEBIDO A QUE SE REALIZA UNA PASADA POR EL DATASET PARA CADA ESTADÍSTICA
print("Count: " + str(rdd.count()))
print("Mean: " + str(rdd.mean()))
print("Sum: " + str(rdd.sum()))
print("Max: " + str(rdd.max()))
print("Min: " + str(rdd.min()))
print("Variance: " + str(rdd.variance()))
print("Smaple variance: " + str(rdd.sampleVariance()))
print("Standard deviation: " + str(rdd.stdev()))
print("Sample standard deviation: " + str(rdd.sampleStdev()))

# Lectura y escritura de ficheros

## Ficheros de texto

In [None]:
input1 = sc.textFile("datos/pg2000.txt")
input2 = sc.textFile("datos/")
input3 = sc.textFile("datos/*.txt")
input4 = sc.wholeTextFiles("datos/*.txt")

print("Elementos en RDD a partir de pg2000.txt: " + str(input1.count()))
# print "Elementos en RDD a partir de datos/: " + str(input2.count())    #  datasets muy grandes - mejor no esperar
print("Elementos en RDD a partir de datos/*.txt: " + str(input3.count()))
print("Elementos en RDD a partir de datos/.*txt con wholeTextFiles: " + str(input4.count()))

# CUIDADO, Falla si la carpeta ya existe
input1.saveAsTextFile("datos/salida")
print("Ver datos escritos en datos/salida")

## Ficheros JSON

In [None]:
import json
data = sc.textFile("datos/json.json").map(lambda x: json.loads(x))
print("Elementos en RDD a partir de datos/json.json: " + str(data.count()))


data.map(lambda x: json.dumps(x)).saveAsTextFile("datos/salida.json")
print("Ver datos escritos en datos/salida.json")

## Ficheros CSV

In [None]:
import csv
import StringIO
def loadRecord(line):
    """Parse a CSV line"""
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["nombre", "tel", "email"])
    return reader.next()

inputCSV = sc.textFile("datos/personas.csv").map(loadRecord)
print("Primeras 10 personas: ")
print(str(inputCSV.take(10)))


def writeRecords(records):
    """Write out CSV lines"""
    output = StringIO.StringIO()
    writer = csv.DictWriter(output, fieldnames=["nombre", "tel", "email"])
    for record in records:
        writer.writerow(record)
    return [output.getvalue()]
    
inputCSV.mapPartitions(writeRecords).saveAsTextFile("datos/salida.csv")
print("Ver datos escritos en datos/salida.csv")
