# Arquitecturas Cloud y Big Data
#
### Modulo 6 - Dia 3
### 1. RDDs: Transformaciones

##### Transformación filter() : 
Filtra un RDD manteniendo solo los elementos que cumplan una condición.

In [0]:
# Crear RDD 
numeros = sc.parallelize([1,2,3,4,5])

# Aplicar transformación filter()
pares_rdd = numeros.filter(lambda elemento: elemento%2 == 0)

# Visualizar elemento RDD pares_rdd
pares_rdd.collect()

Out[1]: [2, 4]

In [0]:
# Cuestiones sobre filter()
    # ¿Cuál es el tamaño del rdd de salida?

# Crear RDD 
log = sc.parallelize(['E: e21', 'W: w12', 'W: w13', 'E: e45'])

# Aplicar transformación filter()
errors = log.filter(lambda elemento: elemento[0] == 'E')

# Visualizar elemento RDD
errors.collect()

Out[2]: ['E: e21', 'E: e45']

##### Transformación flatMap() : 
Como map pero por cada elemento puede crear más elementos.

In [0]:
# Crear RDD 
numeros = sc.parallelize([1,2,3,4,5])

# Aplicar transformación flatMap()
rdd = numeros.flatMap(lambda elemento: [elemento, 10 * elemento])

# Visualizar elemento RDD
rdd.collect()

Out[3]: [1, 10, 2, 20, 3, 30, 4, 40, 5, 50]

In [0]:
# Cuestiones sobre flatMap()
    # ¿Cuántos elementos tendrá el RDD de salida?

# Crear RDD 
lineas = sc.parallelize(['', 'a', 'ab', 'abc'])

# Aplicar transformación flatMap()
palabras = lineas.flatMap(lambda elemento: elemento.split())

# Visualizar elemento RDD
palabras.collect()

Out[4]: ['a', 'ab', 'abc']

In [0]:
# Diferencia entre flaMap() y map()

# Crear RDD 
lineas = sc.parallelize(['','a','ab','abc'])

# Aplicar transformación flatMap()
palabras_flat = lineas.flatMap(lambda elemento: elemento.split())

# Aplicar transformación map()
palabras_map = lineas.map(lambda elemento: elemento.split())

# Visualizar elemento RDD
palabras_map.collect()

Out[5]: [[], ['a'], ['ab'], ['abc']]

##### Transformación distinct() : 
Crea un nuevo RDD eliminando duplicados.

In [0]:
# Crear RDD 
numeros = sc.parallelize([1,1,2,2,5])

# Aplicar transformación distinct()
unicos = numeros.distinct()

# Visualizar elemento RDD
unicos.collect()

Out[6]: [1, 2, 5]

##### Transformación sample() : 

- Remuestrea el RDD de entrada con o sin reemplazamiento.
- El primer parámetro indica si hay reemplazamiento.
- El segundo parámetro indica la fracción de datos aproximados que se seleccionan.

In [0]:
# Crear RDD
numeros =sc.parallelize([1,2,3,4,5,6,7,8,9,10])

# Aplicar transformación sample()
muestra = numeros.sample(False, 0.5)

# Visualizar elemento RDD
muestra.collect()

Out[43]: [1, 3, 6, 7, 9]

##### Transformación union() : 
Une dos RDDs en uno

In [0]:
# Crear RDD
pares = sc.parallelize([2,4,6,8,10])
impares = sc.parallelize([1,3,5,7,9])

# Aplicar transformación union()
numeros = pares.union(impares)

# Visualizar elemento RDD
numeros.collect()

Out[11]: [2, 4, 6, 8, 10, 1, 3, 5, 7, 9]

In [0]:
# Transformación “union()”: ejemplo de uso sencillo

# Crear RDD 
log = sc.parallelize(['E: e21', 'I: i11', 'W: w12', 'I: i11', 'W: w13', 'E: e45'])

# Aplicar transformaciones filter() y union()
info = log.filter(lambda elemento:    elemento[0] == 'I')
errs = log.filter(lambda elemento:    elemento[0] == 'E')
inferr = info.union(errs)

# Visualizar elemento RDD
inferr.collect()

Out[13]: ['I: i11', 'I: i11', 'E: e21', 'E: e45']

### 2. RDDs: Acciones

#### RDDs: Acciones 
#
- Devuelven un resultado.
- Desencadena la ejecución de toda la secuencia de RDD necesarios para calcular lo requerido.
- Ejecutala "receta”.
#
#### RDDs: Acciones más comunes
#
- count()
- reduce()
- take()
- collect()
- takeOrdered(n[,key=func])
#
A continuación, se explicaran cada una de estas acciones.

##### Acción count() : 

- Devuelve el número de elementos del RDD.

In [0]:
# Crear RDD
numeros = sc.parallelize([1,2,3,4,5,6,7,8,9,10])

# Aplicar transformación filter()
pares = numeros.filter(lambda elemento: elemento%2 == 0)

# Aplicar accion count()
pares.count()


Out[14]: 5

##### Acción reduce() : 

- Agrega todos los elementos del RDD por pares hasta obtener un único valor (expresión lambda).

In [0]:
# Crear RDD
numeros = sc.parallelize([1,2,3,4,5])

# Imprimir resultado y aplicar acción reduce
print(numeros.reduce(lambda elem1, elem2: elem1 + elem2))

15


In [0]:
# otro ejemplo de acción reduce()

# Crear RDD
palabras = sc.parallelize(['HOLA', 'Que', 'TAL', 'Bien'])

# Aplicar transformación map() y acción reduce()
pal_minus = palabras.map(lambda elemento: elemento.lower())
print(pal_minus.reduce(lambda elem1, elem2: elem1 + "-" + elem2))

hola-que-tal-bien


##### Acción take() : 

- Devuelve una lista con los primeros n elementos del RDD.

In [0]:
# Crear RDD
numeros = sc.parallelize([5,3,2,1,4])

# Aplicar acción take() para los primero 3 numeros
print(numeros.take(3))

[5, 3, 2]


##### Acción collect() : 

- Devuelve una lista con todos los elementos del RDD.

In [0]:
# Crear RDD
numeros = sc.parallelize([5,3,2,1,4])

# Aplicar acción collect e imprimir lista de elementos
print(numeros.collect())

[5, 3, 2, 1, 4]


##### Acción takeOrdered() : 

- Devuelve una lista con los primeros n elementos del RDD en orden ascendente.

In [0]:
# Crear RDD
numeros = sc.parallelize([3,2,1,4,5])

# Aplicar acción takeOrdered e imprimir lista en orden ascendente
print(numeros.takeOrdered(3))

[1, 2, 3]


In [0]:
# Acción “takeOrdered”: cambiar criterio ordenación
    # También podemos pasar una función para ordenar como queramos.

# Crear RDD
numeros = sc.parallelize([3,2,1,4,5])

# Aplicar acción takeOrdered e imprimir lista en orden ascendente
print(numeros.takeOrdered(3, lambda elem: -elem))

[5, 4, 3]


In [0]:
## Pares ordenados

# Crear RDD
numeros = sc.parallelize([3,2,1,4,5])

pares = numeros.filter(lambda elemento: elemento%2 == 0)

# Aplicar acción takeOrdered e imprimir lista en orden ascendente
print(pares.takeOrdered(3))

[2, 4]


In [0]:
## Impares ordenados

# Crear RDD
numeros = sc.parallelize([3,2,1,4,5])

impares = numeros.filter(lambda elemento: elemento%2 != 0)

# Aplicar acción takeOrdered e imprimir lista en orden ascendente
print(impares.takeOrdered(3))

[1, 3, 5]


### 3. EJERCICIO PRÁCTICO: 	archivo de texto

#### Cómo “subimos” un archivo de texto
#
1. En el notebook nos vamos en el menú de arriba a “File”, desplegamos y picamos en “UploadData”.
2. En la nueva Ventana Podemos clicker para buscar el archivo, o directamente arrastrar al recuadro de fondo gris el archivo.
3. Una vez subido (tarda según el tamaño del archivo, conexión) le damos al botón “Next”.
4.  En esta nueva ventana solo clickamos en “Copy” en la esquina inferior derecha (resto opciones por defecto las dejamos) y finalmente a “Done”.
5. Volvemos al notebook y en una de las celdas copiamos el contenido. Ya tenemos la ruta del archivo.

In [0]:
# Ruta donde se encuentra almacenado el fichero quijote.txt
dbfs:/FileStore/shared_uploads/joseernst@gmail.com/quijote.txt

[0;36m  File [0;32m"<command-346751092172873>"[0;36m, line [0;32m2[0m
[0;31m    dbfs:/FileStore/shared_uploads/joseernst@gmail.com/quijote.txt[0m
[0m         ^[0m
[0;31mSyntaxError[0m[0;31m:[0m invalid syntax


In [0]:
# Ejercicio 1 : Contar caracteres de un fichero.

# Almacenamos la ruta en una variable 
file = 'dbfs:/FileStore/shared_uploads/joseernst@gmail.com/quijote-2.txt'

# Creamos un RDD del fichero quijote.txt
lineas = sc.textFile(file)

# Aplicamos la transformación map
long_lineas = lineas.map(lambda elemento: len(elemento))

# Aplicamos la acción reduce para obtener conteo de todos los caracteres e imprimimos
print(long_lineas.reduce(lambda elem1, elem2: elem1 + elem2))

1015974


In [0]:
# Confirmamos tipo de dato
type(lineas)

Out[5]: pyspark.rdd.RDD

In [0]:
# Exploramos fichero
lineas.collect()

Out[6]: ['DON QUIJOTE',
 'DE LA MANCHA',
 '(PRIMERA PARTE)',
 'Miguel de Cervantes Saavedra',
 'La presente edición corresponde a ¿? don Quijote',
 'http://www.donquijote.org',
 'PRIMERA PARTE',
 'CAPÍTULO 1: Que trata de la condición y ejercicio del famoso hidalgo D. Quijote de la Mancha',
 'En un lugar de la Mancha, de cuyo nombre no quiero acordarme, no ha mucho tiempo que vivía un hidalgo de los de lanza en astillero, adarga antigua, rocín flaco y galgo corredor. Una olla de algo más vaca que carnero, salpicón las más noches, duelos y quebrantos los sábados, lentejas los viernes, algún palomino de añadidura los domingos, consumían las tres partes de su hacienda. El resto della concluían sayo de velarte, calzas de velludo para las fiestas con sus pantuflos de lo mismo, los días de entre semana se honraba con su vellori de lo más fino. Tenía en su casa una ama que pasaba de los cuarenta, y una sobrina que no llegaba a los veinte, y un mozo de campo y plaza, que así ensillaba el rocín

In [0]:
# Podemos confirmar cantidad de caracteres por lineas en el fichero
long_lineas.collect()

Out[7]: [11,
 12,
 15,
 28,
 48,
 25,
 13,
 92,
 4120,
 4457,
 1775,
 88,
 4231,
 4445,
 883,
 142,
 2650,
 87,
 4217,
 1883,
 2692,
 4038,
 77,
 4248,
 1382,
 3007,
 4249,
 79,
 3723,
 533,
 3899,
 424,
 120,
 4377,
 1306,
 3253,
 4467,
 163,
 82,
 4403,
 4532,
 1102,
 177,
 4294,
 2473,
 1964,
 4539,
 2956,
 118,
 4427,
 100,
 4389,
 1772,
 99,
 4379,
 468,
 4045,
 1465,
 62,
 4420,
 4334,
 1210,
 600,
 1215,
 73,
 4446,
 788,
 3722,
 3379,
 79,
 3728,
 94,
 3457,
 1013,
 4534,
 165,
 55,
 3883,
 2393,
 102,
 251,
 174,
 173,
 109,
 117,
 176,
 177,
 222,
 184,
 179,
 105,
 110,
 188,
 596,
 186,
 112,
 108,
 181,
 175,
 219,
 177,
 189,
 104,
 104,
 187,
 179,
 176,
 397,
 1937,
 2457,
 3166,
 231,
 971,
 436,
 198,
 1132,
 115,
 4223,
 4467,
 1249,
 97,
 2904,
 1061,
 745,
 1720,
 974,
 735,
 326,
 2498,
 1515,
 607,
 708,
 842,
 583,
 173,
 2451,
 828,
 404,
 256,
 405,
 944,
 1647,
 800,
 993,
 1082,
 1133,
 1039,
 166,
 1208,
 1224,
 1086,
 477,
 803,
 132,
 1982,
 957,
 631,
 2

In [0]:
# Fin de la presentacion dia 3