In [2]:
%%capture
%%configure -f
{"name":"Nacho Riaño - Formación Spark RDDs",
"executorCores":1,
"numExecutors":1}

Starting Spark application
SparkSession available as 'spark'.


- `%%capture` evita que se muestre la salida por pantalla ([documentación](http://ipython.readthedocs.io/en/stable/interactive/magics.html#cellmagic-capture))
- `%%configure` establece parámetros de ejecución de spark ([documentación](https://github.com/jupyter-incubator/sparkmagic/blob/master/screenshots/help.png))

In [3]:
from pprint import pprint as pp #pretty printer

# RDDs simples

* Similares a una lista de Python
* Estructura de datos de solo lectura
* Colección de datos particionada y paralelizada
* Tolerante a fallos
* Se puede generar paralelizando una lista existente o basándose en un dataset 


## Creación

### Creacion desde un archivo de texto

In [53]:
rdd_quijote=sc.textFile("adl://barcelodatalake.azuredatalakestore.net/pruebas/Nacho/quijote-2parrafos.txt") 
pp(rdd_quijote.collect()) # Collect es una acción que tranforma una RDD en una lista            

['En un lugar de la Mancha, de cuyo nombre no quiero acordarme, no ha mucho',
 'tiempo que vivía un hidalgo de los de lanza en astillero, adarga antigua,',
 'rocín flaco y galgo corredor. Una olla de algo más vaca que carnero,',
 'salpicón las más noches, duelos y quebrantos los sábados, lantejas los',
 'viernes, algún palomino de añadidura los domingos, consumían las tres',
 'partes de su hacienda. El resto della concluían sayo de velarte, calzas de',
 'velludo para las fiestas, con sus pantuflos de lo mesmo, y los días de',
 'entresemana se honraba con su vellorí de lo más fino. Tenía en su casa una',
 'ama que pasaba de los cuarenta, y una sobrina que no llegaba a los veinte,',
 'y un mozo de campo y plaza, que así ensillaba el rocín como tomaba la',
 'podadera. Frisaba la edad de nuestro hidalgo con los cincuenta años; era de',
 'complexión recia, seco de carnes, enjuto de rostro, gran madrugador y amigo',
 'de la caza. Quieren decir que tenía el sobrenombre de Quijada, o Quesada,'

**Ejercicio** Carga el archivo `adl://barcelodatalake.azuredatalakestore.net/pruebas/formacion_abril_2018/viajes_por_españa.txt`

In [5]:
rdd_viajes_espana = sc.textFile("adl://barcelodatalake.azuredatalakestore.net/pruebas/formacion_abril_2018/viajes_por_españa.txt") 

['Si sois algo jinete (condición sine qua non); si contáis además con',
 'cuatro días y treinta duros de sobra, y tenéis, por último, en',
 'Navalmoral de la Mata algún conocido que os proporcione caballo y guía',
 'podéis hacer facilísimamente un viaje de primer orden que os ofrecerá',
 'reunidos los múltiples goces de una exploración geográfico-pintoresca,',
 'el grave interés de una excursión historial y artística, y la religiosa',
 'complacencia de aquellas romerías verdaderamente patrióticas que, como',
 'todo deber cumplido, ufanan y alegran el alma de los que todavía',
 'respetan algo sobre la tierra..... Podéis, en suma, visitar el',
 'Monasterio de Yuste.',
 '',
 'Para ello..... (suponemos que estáis en Madrid) empezaréis por tomar un',
 'billete, de berlina o de interior, hasta Navalmoral de la Mata, en la',
 '«Diligencia de Cáceres», que sale diariamente de la calle del Correo',
 'de ésta que fué corte, a las siete y media de la tarde.',
 '',
 'La carretera es buena por lo g

### Creación desde una lista python: parallelize

In [43]:
lista=["ab", "cosa", 4.3, 'ejemplo_1', 'ejemplo_2', 45, 85]
rdd=sc.parallelize(lista)


**Ejercicio:** 

Muestra por pantalla la lista contenida en el `rdd` previo


In [None]:
pp(rdd_viajes_espana.collect()) # Collect es una acción que tranforma una RDD en una lista   

**Ejercicio:**

Crea un rdd que contenga los números del 1 al 100  
*pista: puedes usar funciones de python, como por ejemplo range. Asegurate que contiene los números 1 y 100*

In [10]:
primeros_100_numeros = range(1,100)
rdd_primeros_100_numeros = sc.parallelize(primeros_100_numeros)
rdd_primeros_100_numeros.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]

### Almacenamiento de un rdd en el datalake: saveAsTextFile()

In [49]:
lista=["ab", "cosa", 4.3, 'ejemplo_1', 'ejemplo_2', 45, 85]
rdd=sc.parallelize(lista)
rdd.repartition(1).saveAsTextFile(path='adl://barcelodatalake.azuredatalakestore.net/pruebas/Nacho/saveastextfile_ejemplo')

### Creación desde cosmos

In [5]:
from pydocumentdb import document_client
from pydocumentdb import documents

COSMOSDB_MASTER_KEY = '31Q6cUXQrdSP3P19myCUZEAgTpdoru89C0mk4vo8kJZFaGAN0mPCEU5lFWwWdo4Z4FBKRMDo5WpZdw7EeP29tw=='
connection_policy = documents.ConnectionPolicy()
host = 'https://desbarcelocosmosdb.documents.azure.com:443/'
client = document_client.DocumentClient(host, {'masterKey': COSMOSDB_MASTER_KEY}, connection_policy)
collLinks = 'dbs/barcelo_des/colls/clientes_datos_basicos'


In [12]:
query = 'SELECT TOP 5 c.Datos_Comunicacion_Email.Emails[0].Email, c.Datos_Identificativos.Nombre, c.id FROM c \
        where c.Datos_Identificativos.Nombre != ""'


In [13]:
print('ejecuto la query')
resultados_consulta_documento_cliente = client.QueryDocuments(collLinks, {'query': query, },
                                                              {'enableCrossPartitionQuery': 'true'})
rdd3 = sc.parallelize(resultados_consulta_documento_cliente)
pp(rdd3.collect())

ejecuto la query
[{'Email': 'enzosar@hotmail.com',
  'Nombre': 'Enzo Adrian',
  'id': '0940087b-7e20-484c-91c4-b61fc2dfe9f7'},
 {'Email': 'lowellkaiser@hotmail.com',
  'Nombre': 'Lowell',
  'id': '0dd089f9-1782-4acd-a473-21a3f1027e78'},
 {'Email': 'lorportillo@hotmail.com',
  'Nombre': 'Lorena',
  'id': '032a82cf-c63f-4f09-9491-82e0ddde88f0'},
 {'Email': 'stucki@sbg.at',
  'Nombre': 'Andrea',
  'id': '0222b07e-1a6a-4621-85bd-2d6224eb3a95'},
 {'Email': 'potterfamilymom@gmail.com',
  'Nombre': 'Deborah Lynne',
  'id': '04f9ab61-34db-42e0-ae20-0e46c5aca780'}]

**Ejercicio:**
    
Crea y muestra por pantalla un rdd con el nombre y apellido de 10 personas con nacionalidad española  


In [14]:
rdd = sc.parallelize(['Pablo', 'Suaña', 'Joaquin', 'Cortes', 'Maria', 'Diez', 'Julian', 'Perez', 'Noelia', 'Martinez'])
rdd.collect()

['Pablo', 'Suaña', 'Joaquin', 'Cortes', 'Maria', 'Diez', 'Julian', 'Perez', 'Noelia', 'Martinez']

### Creación desde lista diccionarios

In [15]:
lista=[{'clave_1': 'valor_1'}, {'clave_21': 'valor_21', 'clave_22': 'valor_22'}, {'clave_3': 'valor_3'}]
rdd4=sc.parallelize(lista)
pp(rdd4.collect())

[{'clave_1': 'valor_1'},
 {'clave_21': 'valor_21', 'clave_22': 'valor_22'},
 {'clave_3': 'valor_3'}]

# Transformaciones y acciones con RDDs simples

### ¿Que es una transformación?
* *RDD-->RDD*: Crea un nuevo rdd a partir de uno ya existente
* *Lazy*, es decir, todas las transformaciones ejecutadas sobre un rdd son calculadas únicamente cuando se ejecuta una acción

### Tipos de transformaciones:
* *Narrow*: transformación que no necesita conocer los datos que están en el resto de nodos para  poder completarse con éxito. Ej: filter, map, etc
* *Wide*: transformación que para completarse necesita usar los datos que contienen el resto de nodos. Ej: groupBy, reduceByKey




### **Transformaciones más usadas:**

* map(func)
* flatMap(func)
* filter(func)
* sample(withReplacement, fraction, seed)
* union(otherDataset)
* intersection(otherDataset)
* distinct([numTasks]))
* repartitionAndSortWithinPartitions(partitioner)
* cogroup(otherDataset, [numTasks])
* cartesian(otherDataset)
* coalesce(numPartitions)
* repartition(numPartitions)


## Transformación Map

### Usando lambdas (para _map_'s sencillos)

In [8]:
numeros=[1,2,3,4]
rdd=sc.parallelize(numeros)
rdd.map(lambda x: 2*x).collect()

[2, 4, 6, 8]

In [10]:
palabras=["ejemplo","cosa","perro"]
rdd=sc.parallelize(palabras)
rdd.map(lambda x: x+1).collect() #Da un error ya que el +1 no se puede aplicar a strings

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 6.0 failed 4 times, most recent failure: Lost task 8.3 in stage 6.0 (TID 181, 10.0.0.7, executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 174, in main
    process()
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<stdin>", line 3, in <lambda>
TypeError: Can't convert 'int' object to str implicitly

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:

### Usando funciones python para _map_'s complejos

In [83]:
def duplicar(x):
    factor = 2
    resultado = factor * x
    return resultado

In [7]:
rdd.map(duplicar).collect()

[2, 4, 'abab', 'cosacosa', 8.6]

In [8]:
rdd.map(duplicar).map(lambda x: "!"+str(x)+"!").collect() #2 maps encadenados -cada uno de un tipo-

['!2!', '!4!', '!abab!', '!cosacosa!', '!8.6!']

**Ejercicio:**

A partir del rdd que contiene números del 1 al 100, crea un rdd cuyos elementos sean el triple de estos

In [37]:
primeros_100_numeros = range(1,100)
rdd_primeros_100_numeros = sc.parallelize(primeros_100_numeros)
rdd_primeros_100_numeros_por3 = rdd_primeros_100_numeros.map(lambda numero: numero*3)
rdd_primeros_100_numeros_por3.collect()

[3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57, 60, 63, 66, 69, 72, 75, 78, 81, 84, 87, 90, 93, 96, 99, 102, 105, 108, 111, 114, 117, 120, 123, 126, 129, 132, 135, 138, 141, 144, 147, 150, 153, 156, 159, 162, 165, 168, 171, 174, 177, 180, 183, 186, 189, 192, 195, 198, 201, 204, 207, 210, 213, 216, 219, 222, 225, 228, 231, 234, 237, 240, 243, 246, 249, 252, 255, 258, 261, 264, 267, 270, 273, 276, 279, 282, 285, 288, 291, 294, 297]

**Ejercicio:**
    
A partir del rdd que contiene el texto 'Viajes por España'...

Añade el símbolo # al principio de cada línea y muestra el texto por pantalla

In [18]:
rdd_viajes_espana.map(lambda linea: '#'+linea).collect()

['#Si sois algo jinete (condición sine qua non); si contáis además con', '#cuatro días y treinta duros de sobra, y tenéis, por último, en', '#Navalmoral de la Mata algún conocido que os proporcione caballo y guía', '#podéis hacer facilísimamente un viaje de primer orden que os ofrecerá', '#reunidos los múltiples goces de una exploración geográfico-pintoresca,', '#el grave interés de una excursión historial y artística, y la religiosa', '#complacencia de aquellas romerías verdaderamente patrióticas que, como', '#todo deber cumplido, ufanan y alegran el alma de los que todavía', '#respetan algo sobre la tierra..... Podéis, en suma, visitar el', '#Monasterio de Yuste.', '#', '#Para ello..... (suponemos que estáis en Madrid) empezaréis por tomar un', '#billete, de berlina o de interior, hasta Navalmoral de la Mata, en la', '#«Diligencia de Cáceres», que sale diariamente de la calle del Correo', '#de ésta que fué corte, a las siete y media de la tarde.', '#', '#La carretera es buena por lo 

**Ejercicio:**
    
A partir del rdd que contiene el texto 'Viajes por España'...  
Separa cada línea en palabras (entendemos por palabra un texto delimitado por espacios) y muestralo por pantalla. Deberías obtener una lista de listas  
Pista: usar el método split() de python

In [19]:
rdd_viajes_espana.map(lambda linea: linea.split()).collect()

[['Si', 'sois', 'algo', 'jinete', '(condición', 'sine', 'qua', 'non);', 'si', 'contáis', 'además', 'con'], ['cuatro', 'días', 'y', 'treinta', 'duros', 'de', 'sobra,', 'y', 'tenéis,', 'por', 'último,', 'en'], ['Navalmoral', 'de', 'la', 'Mata', 'algún', 'conocido', 'que', 'os', 'proporcione', 'caballo', 'y', 'guía'], ['podéis', 'hacer', 'facilísimamente', 'un', 'viaje', 'de', 'primer', 'orden', 'que', 'os', 'ofrecerá'], ['reunidos', 'los', 'múltiples', 'goces', 'de', 'una', 'exploración', 'geográfico-pintoresca,'], ['el', 'grave', 'interés', 'de', 'una', 'excursión', 'historial', 'y', 'artística,', 'y', 'la', 'religiosa'], ['complacencia', 'de', 'aquellas', 'romerías', 'verdaderamente', 'patrióticas', 'que,', 'como'], ['todo', 'deber', 'cumplido,', 'ufanan', 'y', 'alegran', 'el', 'alma', 'de', 'los', 'que', 'todavía'], ['respetan', 'algo', 'sobre', 'la', 'tierra.....', 'Podéis,', 'en', 'suma,', 'visitar', 'el'], ['Monasterio', 'de', 'Yuste.'], [], ['Para', 'ello.....', '(suponemos', 'que

## Transformación flatMap

In [6]:
def factores(x):
    lista_factores=[]   
    for i in range(1, x + 1):
        if x % i == 0:
            lista_factores.append(i)
            
    
    return lista_factores         

rdd=sc.parallelize([1,15,21,77])
print('map normal: \n', rdd.map(factores).collect(),'\n')

print('flatMap: \n',rdd.flatMap(factores).collect(), '\n')

map normal: 
 [[1], [1, 3, 5, 15], [1, 3, 7, 21], [1, 7, 11, 77]] 

flatMap: 
 [1, 1, 3, 5, 15, 1, 3, 7, 21, 1, 7, 11, 77]

**Ejercicio:**
    
A partir del rdd que contiene el texto 'Viajes por España'...

Separa cada línea en palabras (entendemos por palabra lo delimitados por espacios) y obten esta vez una lista de strings  
Muestra el resultado por pantalla


In [113]:
rdd_palabras_viajes_espana = rdd_viajes_espana.flatMap(lambda linea: linea.split())
rdd_palabras_viajes_espana.collect()

['Si', 'sois', 'algo', 'jinete', '(condición', 'sine', 'qua', 'non);', 'si', 'contáis', 'además', 'con', 'cuatro', 'días', 'y', 'treinta', 'duros', 'de', 'sobra,', 'y', 'tenéis,', 'por', 'último,', 'en', 'Navalmoral', 'de', 'la', 'Mata', 'algún', 'conocido', 'que', 'os', 'proporcione', 'caballo', 'y', 'guía', 'podéis', 'hacer', 'facilísimamente', 'un', 'viaje', 'de', 'primer', 'orden', 'que', 'os', 'ofrecerá', 'reunidos', 'los', 'múltiples', 'goces', 'de', 'una', 'exploración', 'geográfico-pintoresca,', 'el', 'grave', 'interés', 'de', 'una', 'excursión', 'historial', 'y', 'artística,', 'y', 'la', 'religiosa', 'complacencia', 'de', 'aquellas', 'romerías', 'verdaderamente', 'patrióticas', 'que,', 'como', 'todo', 'deber', 'cumplido,', 'ufanan', 'y', 'alegran', 'el', 'alma', 'de', 'los', 'que', 'todavía', 'respetan', 'algo', 'sobre', 'la', 'tierra.....', 'Podéis,', 'en', 'suma,', 'visitar', 'el', 'Monasterio', 'de', 'Yuste.', 'Para', 'ello.....', '(suponemos', 'que', 'estáis', 'en', 'Madri

## Transformación mapPartitions

In [36]:
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8], 3)
print("particiones",rdd.glom().collect())
def s(iterator): yield sum(iterator)
def m(iterator): yield max(iterator)

print("suma",rdd.mapPartitions(s).collect())
print("suma",rdd.mapPartitions(m).collect())

particiones [[1, 2], [3, 4], [5, 6, 7, 8]]
suma [3, 7, 26]
suma [2, 4, 8]

## Transformación filter

In [88]:
rdd = sc.parallelize(['a', 'b', 1, 13, 'ab', 12])

print('rdd original: ', rdd.collect())
print('rdd filtrado: ', rdd.filter(lambda x:  type(x) is not str ).collect())

print('rdd filtrado y aplicado un map: ', 
      rdd.filter(lambda x:  type(x) is not str )\
      .map(duplicar).collect()) # Todas las transformaciones se pueden encadenar

rdd original:  ['a', 'b', 1, 13, 'ab', 12]
rdd filtrado:  [1, 13, 12]
rdd filtrado y aplicado un map:  [2, 26, 24]

**Ejercicio:**
    
A partir del rdd que contiene los numeros del 1 al 100 encuentra los divisibles por 7

In [25]:
rdd_primeros_100_numeros.filter(lambda numero: numero%7 == 0).collect()

[7, 14, 21, 28, 35, 42, 49, 56, 63, 70, 77, 84, 91, 98]

**Ejercicio:**
    
A partir del rdd que contiene el texto 'Viajes por España'... encuentra:
* Las líneas que contienen hotel u hoteles    

In [27]:
rdd_viajes_espana.filter(lambda linea: 'hotel' in linea).collect()

['habréis hecho perfectamente, pues no os esperan grandes hoteles, que']

* Las palabras de menos de 6 letras  
Pista: dividir la línea en palabras previamente y usar la función de Python 'len'
    

In [32]:
rdd_viajes_espana.flatMap(lambda linea: linea.split()).filter(lambda palabra: len(palabra)<6).collect()

['Si', 'sois', 'algo', 'sine', 'qua', 'non);', 'si', 'con', 'días', 'y', 'duros', 'de', 'y', 'por', 'en', 'de', 'la', 'Mata', 'algún', 'que', 'os', 'y', 'guía', 'hacer', 'un', 'viaje', 'de', 'orden', 'que', 'os', 'los', 'goces', 'de', 'una', 'el', 'grave', 'de', 'una', 'y', 'y', 'la', 'de', 'que,', 'como', 'todo', 'deber', 'y', 'el', 'alma', 'de', 'los', 'que', 'algo', 'sobre', 'la', 'en', 'suma,', 'el', 'de', 'Para', 'que', 'en', 'por', 'tomar', 'un', 'de', 'o', 'de', 'hasta', 'de', 'la', 'Mata,', 'en', 'la', 'de', 'que', 'sale', 'de', 'la', 'calle', 'del', 'de', 'ésta', 'que', 'fué', 'a', 'las', 'siete', 'y', 'media', 'de', 'la', 'La', 'es', 'buena', 'por', 'lo', 'y', 'en', 'por', 'la', 'de', 'los', 'donde', 'los', 'su', 'muy', 'por', 'las', 'de', 'y', 'por', 'que', 'es', 'como', 'si', 'por', 'el', 'de', 'los', 'por', 'donde', 'os', 'de', 'su', 'y', 'de', 'su', 'del', 'año', 'de', '1808;', 'por', 'uno', 'de', 'los', 'que', 'de', 'a', 'por', 'que', 'nada', 'tiene', 'de', 'ni', 'de', '

**Ejercicio:**
    
A partir del rdd que contiene el texto 'Viajes por España'...

Separa cada línea en palabras (entendemos por palabra lo delimitados por espacios).
Despues, convierte cada palabra a minusculas y elimina los caracteres no alfanuméricos.
Finalmente, muestra el resultado por pantalla  
Pista: Para eliminar los carácteres no alfanumericos usar la expresión regular '[^\w]' y la función *sub* de la librería re  
       Para poner en minúscula se usa el método lower()


In [117]:
import re
rdd_palabras_alfanum_viajes_espana = rdd_viajes_espana.flatMap(lambda linea: linea.split()).map(lambda palabra: re.sub('[^\w]', '', palabra.lower()))
rdd_palabras_alfanum_viajes_espana.collect()

['si', 'sois', 'algo', 'jinete', 'condición', 'sine', 'qua', 'non', 'si', 'contáis', 'además', 'con', 'cuatro', 'días', 'y', 'treinta', 'duros', 'de', 'sobra', 'y', 'tenéis', 'por', 'último', 'en', 'navalmoral', 'de', 'la', 'mata', 'algún', 'conocido', 'que', 'os', 'proporcione', 'caballo', 'y', 'guía', 'podéis', 'hacer', 'facilísimamente', 'un', 'viaje', 'de', 'primer', 'orden', 'que', 'os', 'ofrecerá', 'reunidos', 'los', 'múltiples', 'goces', 'de', 'una', 'exploración', 'geográficopintoresca', 'el', 'grave', 'interés', 'de', 'una', 'excursión', 'historial', 'y', 'artística', 'y', 'la', 'religiosa', 'complacencia', 'de', 'aquellas', 'romerías', 'verdaderamente', 'patrióticas', 'que', 'como', 'todo', 'deber', 'cumplido', 'ufanan', 'y', 'alegran', 'el', 'alma', 'de', 'los', 'que', 'todavía', 'respetan', 'algo', 'sobre', 'la', 'tierra', 'podéis', 'en', 'suma', 'visitar', 'el', 'monasterio', 'de', 'yuste', 'para', 'ello', 'suponemos', 'que', 'estáis', 'en', 'madrid', 'empezaréis', 'por', 

## Transformación sample

In [44]:
rdd = sc.parallelize(['a','b','c', 'd','e','f', 1,2,3,4])

print(rdd.sample(withReplacement=False, fraction=0.2).collect())
print(rdd.sample(withReplacement=True, fraction=0.5).collect())
print(rdd.sample(withReplacement=True, fraction=0.5).collect())

[1, 2]
['a', 'c', 'c']
['a', 'a', 'b', 'f', 1, 2, 3, 4]

## Transformación union

In [122]:
rdd_1 = sc.parallelize([1,2,3, 4])
rdd_2 = sc.parallelize([1, 11,22,33, 4])

rdd = rdd_1.union(rdd_2)
rdd.collect()

[1, 2, 3, 4, 11, 22, 33, 4]

## Transformación intersection

In [121]:
rdd_1 = sc.parallelize([1, 2, 3, 4])
rdd_2 = sc.parallelize([1, 11,22,33, 4])

rdd = rdd_1.intersection(rdd_2)
rdd.collect()

[4]

**Ejercicio:**
    
A partir del rdd generado con los valores multiplicados por tres y el rdd original construido a partir de la lista de valores del 1 al 100...

* Tenerar un rdd que sea la unión de ambos    

In [39]:
rdd_primeros_100_numeros.union(rdd_primeros_100_numeros_por3).collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57, 60, 63, 66, 69, 72, 75, 78, 81, 84, 87, 90, 93, 96, 99, 102, 105, 108, 111, 114, 117, 120, 123, 126, 129, 132, 135, 138, 141, 144, 147, 150, 153, 156, 159, 162, 165, 168, 171, 174, 177, 180, 183, 186, 189, 192, 195, 198, 201, 204, 207, 210, 213, 216, 219, 222, 225, 228, 231, 234, 237, 240, 243, 246, 249, 252, 255, 258, 261, 264, 267, 270, 273, 276, 279, 282, 285, 288, 291, 294, 297]

* Generar otro rdd que sea la intersección entre ambos


    

In [41]:
sorted(rdd_primeros_100_numeros.intersection(rdd_primeros_100_numeros_por3).collect())

[3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57, 60, 63, 66, 69, 72, 75, 78, 81, 84, 87, 90, 93, 96, 99]

* Tomar una muestra de registros al azar (semilla = 1) con fraction=0.2 sin reemplazamiento del rdd con los valores del 1 al 100 y hacer la union y la intersección con el rdd de valores * 3


    

In [50]:
rdd_random = rdd_primeros_100_numeros.sample(withReplacement=False, fraction=0.2, seed=1)
rdd_random.collect()

[10, 12, 14, 31, 42, 64, 74, 79, 85, 91, 95]

In [51]:
rdd_random.union(rdd_primeros_100_numeros_por3).collect()

[10, 12, 14, 31, 42, 64, 74, 79, 85, 91, 95, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57, 60, 63, 66, 69, 72, 75, 78, 81, 84, 87, 90, 93, 96, 99, 102, 105, 108, 111, 114, 117, 120, 123, 126, 129, 132, 135, 138, 141, 144, 147, 150, 153, 156, 159, 162, 165, 168, 171, 174, 177, 180, 183, 186, 189, 192, 195, 198, 201, 204, 207, 210, 213, 216, 219, 222, 225, 228, 231, 234, 237, 240, 243, 246, 249, 252, 255, 258, 261, 264, 267, 270, 273, 276, 279, 282, 285, 288, 291, 294, 297]

In [52]:
rdd_random.intersection(rdd_primeros_100_numeros_por3).collect()

[12, 42]

**Ejercicio:**
Obtén una lista de las palabras que aparecen en ambos textos ("El quijote" y "Viajes por España").  
*Pista: recuerda que es necesario normalizar las palabras antes...*

In [63]:
rdd_viajes_espana_norm = rdd_viajes_espana.flatMap(lambda linea: linea.split()).map(lambda palabra: palabra.strip().lower())
rdd_quijote_norm = rdd_quijote.flatMap(lambda linea: linea.split()).map(lambda palabra: palabra.strip().lower())
sorted(rdd_viajes_espana_norm.intersection(rdd_quijote_norm).collect())

['a', 'algo', 'algún', 'aquellas', 'bien', 'casi', 'como', 'con', 'de', 'del', 'donde', 'días', 'el', 'en', 'este', 'haber', 'la', 'las', 'lo', 'los', 'muchas', 'no', 'nombre', 'o', 'os', 'para', 'pero', 'por', 'que', 'se', 'su', 'todo', 'un', 'una', 'vuestra', 'y']

## Transformación distinct

In [65]:
def factores(x):
    lista_factores=[]   
    for i in range(1, x + 1):
        if x % i == 0:
            lista_factores.append(i)
                        
    
    return lista_factores         

numeros=[1,15,21,77]
rdd=sc.parallelize(numeros)
print('antes del distinct:\n', rdd.flatMap(factores).collect())
print('\n despues del distinct: ', rdd.flatMap(factores).distinct().collect())

antes del distinct:
 [1, 1, 3, 5, 15, 1, 3, 7, 21, 1, 7, 11, 77]

 despues del distinct:  [11, 1, 3, 21, 7, 15, 77, 5]

**Ejercicio:**
    
En el ejemplo previo sobre `distinct`, se llama a flatMap dos veces, lo cual es ineficiente -y por lo tanto una mala práctica-  
Corrije el ejemplo.

In [66]:
rdd_flatmapeado = rdd.flatMap(factores)

print('antes del distinct:\n', rdd_flatmapeado.collect())
print('\n despues del distinct: ', rdd_flatmapeado.distinct().collect())

antes del distinct:
 [1, 1, 3, 5, 15, 1, 3, 7, 21, 1, 7, 11, 77]

 despues del distinct:  [11, 1, 3, 21, 7, 15, 77, 5]

## Transformación cartesian

In [72]:
rdd1 = sc.parallelize([1, 2])
rdd2 = sc.parallelize([3, 4])
sorted(rdd1.cartesian(rdd2).collect())


[(1, 3), (1, 4), (2, 3), (2, 4)]

**Ejercicio**:  
Generar un rdd con todos los enfrentamientos posibles entre estos tres 
equipos:”Ibiza FC”, ”Mallorca FC” y ”Menorca FC”

Hazlo de dos maneras:
* No nos importa que un equipo se enfrente a sí mismo


In [76]:
rdd_1 = sc.parallelize(['Ibiza', 'Mallorca', 'Menorca'])
rdd_2 = sc.parallelize(['Ibiza', 'Mallorca', 'Menorca'])

rdd_enfrentamientos = rdd_1.cartesian(rdd_2)
rdd_enfrentamientos.collect()

[('Ibiza', 'Ibiza'), ('Ibiza', 'Mallorca'), ('Ibiza', 'Menorca'), ('Mallorca', 'Ibiza'), ('Menorca', 'Ibiza'), ('Mallorca', 'Mallorca'), ('Mallorca', 'Menorca'), ('Menorca', 'Mallorca'), ('Menorca', 'Menorca')]

* No queremos que un equipo pueda enfrentarse a sí mismo


In [77]:
rdd_enfrentamientos.filter(lambda elemento: elemento[0] != elemento[1]).collect()

[('Ibiza', 'Mallorca'), ('Ibiza', 'Menorca'), ('Mallorca', 'Ibiza'), ('Menorca', 'Ibiza'), ('Mallorca', 'Menorca'), ('Menorca', 'Mallorca')]

## Transformación coalesce y repartition

In [25]:
rdd1 = sc.parallelize([1, 2, 3, 4, 14, 15, 16, 17, 18, 19, 20, 5, 6, 7, 8, 9, 10, 11, 12, 13], 4)

print('rdd original:\n', rdd1.glom().collect())
print('\nrdd coalesce:\n', rdd1.coalesce(3).glom().collect())
print('\nrdd repartition:\n', rdd1.repartition(3).glom().collect())
print('\nrdd repartition:\n', rdd1.repartition(6).glom().collect())

rdd original:
 [[1, 2, 3, 4, 14], [15, 16, 17, 18, 19], [20, 5, 6, 7, 8], [9, 10, 11, 12, 13]]

rdd coalesce:
 [[1, 2, 3, 4, 14], [15, 16, 17, 18, 19], [20, 5, 6, 7, 8, 9, 10, 11, 12, 13]]

rdd repartition:
 [[9, 10, 11, 12, 13], [1, 2, 3, 4, 14, 15, 16, 17, 18, 19], [20, 5, 6, 7, 8]]

rdd repartition:
 [[], [1, 2, 3, 4, 14], [], [9, 10, 11, 12, 13], [15, 16, 17, 18, 19], [20, 5, 6, 7, 8]]

## Transformación Sample

In [16]:
import random

def crear_1000_numeros_aleatorios():
    numeros=[random.randint(0,42) for x in range(0,1000)]
    return numeros

rdd=sc.parallelize(range(0,100)).flatMap(lambda _:crear_1000_numeros_aleatorios())
    

In [45]:
sampled_rdd=rdd.sample(withReplacement=False, fraction=0.001)

In [46]:
cantidad_estimacion=sampled_rdd.count()
media_estimada=sampled_rdd.reduce(lambda a,b:a+b)/cantidad_estimacion
print("Media: %2.4f basada en una muestra de %d numeros" % (media_estimada, cantidad_estimacion))

Media: 19.7093 basada en una muestra de 86 numeros

In [47]:
rdd.reduce(lambda a,b:a+b)/rdd.count() #media real

20.9684

In [48]:
sampled_rdd.collect()

[31, 14, 39, 35, 1, 22, 28, 12, 5, 14, 6, 30, 11, 42, 37, 42, 30, 39, 13, 7, 10, 24, 15, 35, 5, 38, 40, 28, 30, 12, 0, 14, 1, 22, 42, 0, 41, 24, 11, 27, 22, 25, 13, 23, 3, 32, 27, 4, 33, 9, 22, 16, 12, 2, 1, 42, 25, 8, 4, 5, 17, 14, 41, 29, 31, 27, 11, 19, 33, 22, 11, 6, 14, 9, 6, 3, 31, 19, 22, 23, 10, 18, 4, 24, 37, 9]

In [49]:
sampled_rdd.collect()

[31, 14, 39, 35, 1, 22, 28, 12, 5, 14, 6, 30, 11, 42, 37, 42, 30, 39, 13, 7, 10, 24, 15, 35, 5, 38, 40, 28, 30, 12, 0, 14, 1, 22, 42, 0, 41, 24, 11, 27, 22, 25, 13, 23, 3, 32, 27, 4, 33, 9, 22, 16, 12, 2, 1, 42, 25, 8, 4, 5, 17, 14, 41, 29, 31, 27, 11, 19, 33, 22, 11, 6, 14, 9, 6, 3, 31, 19, 22, 23, 10, 18, 4, 24, 37, 9]

# Acciones

-Que son?

## Acción Reduce

In [19]:
numeros=[1,2,3,4]
rdd=sc.parallelize(numeros)
rdd.reduce(lambda a,b: a+b)

10

In [24]:
numeros=[1,2,3,4]
rdd=sc.parallelize(numeros)
def suma(a,b):
    return a+b

rdd.reduce(suma)

10

In [25]:
rdd.reduce(lambda acumulado, nuevo: str(acumulado)+" <"+str(nuevo)+"> ")

'1 <2>  <3 <4> > '

**Ejercicios:** 
* Encuentra el valor máximo del rdd generado a partir de los números del 1 al 100

In [80]:
rdd_primeros_100_numeros.reduce(max)

99

* Encuentra el valor máximo divisible por 7 del rdd generado a partir de los números del 1 al 100

In [81]:
rdd_primeros_100_numeros.filter(lambda numero: numero%7 == 0).reduce(max)

98

## Acción takeOrdered

In [2]:
rdd_1 = sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7])
rdd_1.takeOrdered(6)

[1, 2, 3, 4, 5, 6]

In [1]:
rdd_1 = sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7])
rdd_1.takeOrdered(6, key=lambda x:-x)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2061,application_1520861122949_1114,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.
[10, 9, 7, 6, 5, 4]

## Acción TakeSample

In [37]:
lista=["ab", "cosa", 4.3, 'ejemplo_1', 'ejemplo_2', 45, 85]
rdd=sc.parallelize(lista)
print(rdd.takeSample(withReplacement=True, num=4, seed=1))
print(rdd.takeSample(withReplacement=False, num=2))
print(rdd.takeSample(withReplacement=True, num=4))


[85, 'ejemplo_2', 'ejemplo_1', 'ejemplo_1']
['ejemplo_1', 4.3]
[4.3, 'ejemplo_1', 4.3, 4.3]

# Pair RDDs

- Es un rdd cuyos elementos son pares clave-valor, siendo la key el primer elemento y la clave el que ocupa la segunda posición


- Se puede crear a partir de un rdd cualquiera usando dividiendo sus elementos en una clave y un valor a través de un map del tipo: `map(lambda elemento: (elemento.split(‘,’)[0], elemento.split(‘,’)[1]))`


- Son la base para las transformaciones por key, tales como groupByKey(), reduceByKey(), etc…


## Creación

In [26]:
PAIS=0
MEDALLAS=1
DEPORTE=2


datos=[("España",1,"Snowboard"),("Canada",2,"Ski"),("España",1,"Patinaje"),
       ("Canada",1,"Biatlón"),("USA",1,"Salto"),("Canada",1,"Bobsleigh")]

rdd=sc.parallelize(datos)

pair_rdd=rdd.map(lambda dato: (dato[PAIS], dato)) #mapeamos pais->(pais,medallas, deporte)
import pprint

pprint.pprint(pair_rdd.collect())


[('España', ('España', 1, 'Snowboard')),
 ('Canada', ('Canada', 2, 'Ski')),
 ('España', ('España', 1, 'Patinaje')),
 ('Canada', ('Canada', 1, 'Biatlón')),
 ('USA', ('USA', 1, 'Salto')),
 ('Canada', ('Canada', 1, 'Bobsleigh'))]

**Ejercicio:**  
A partir del RDD que contiene números del 1 al 100, crea una PairRDD cuya clave sea el número y cuyo valor si es par o no


In [86]:
def par_o_no(numero):
    if numero%2 == 0:
        return 'par'
    else:
        return 'impar'
    
    
pair_rdd_par_impar = rdd_primeros_100_numeros.map(lambda numero: (str(numero), par_o_no(numero)))
pair_rdd_par_impar.collect()

[('1', 'impar'), ('2', 'par'), ('3', 'impar'), ('4', 'par'), ('5', 'impar'), ('6', 'par'), ('7', 'impar'), ('8', 'par'), ('9', 'impar'), ('10', 'par'), ('11', 'impar'), ('12', 'par'), ('13', 'impar'), ('14', 'par'), ('15', 'impar'), ('16', 'par'), ('17', 'impar'), ('18', 'par'), ('19', 'impar'), ('20', 'par'), ('21', 'impar'), ('22', 'par'), ('23', 'impar'), ('24', 'par'), ('25', 'impar'), ('26', 'par'), ('27', 'impar'), ('28', 'par'), ('29', 'impar'), ('30', 'par'), ('31', 'impar'), ('32', 'par'), ('33', 'impar'), ('34', 'par'), ('35', 'impar'), ('36', 'par'), ('37', 'impar'), ('38', 'par'), ('39', 'impar'), ('40', 'par'), ('41', 'impar'), ('42', 'par'), ('43', 'impar'), ('44', 'par'), ('45', 'impar'), ('46', 'par'), ('47', 'impar'), ('48', 'par'), ('49', 'impar'), ('50', 'par'), ('51', 'impar'), ('52', 'par'), ('53', 'impar'), ('54', 'par'), ('55', 'impar'), ('56', 'par'), ('57', 'impar'), ('58', 'par'), ('59', 'impar'), ('60', 'par'), ('61', 'impar'), ('62', 'par'), ('63', 'impar'),

**Ejercicio:**  
A partir del RDD del ejercicio anterior, intercambia la clave y el valor


In [87]:
pair_rdd_par_impar.map(lambda clave_valor: (clave_valor[1], clave_valor[0])).collect()

[('impar', '1'), ('par', '2'), ('impar', '3'), ('par', '4'), ('impar', '5'), ('par', '6'), ('impar', '7'), ('par', '8'), ('impar', '9'), ('par', '10'), ('impar', '11'), ('par', '12'), ('impar', '13'), ('par', '14'), ('impar', '15'), ('par', '16'), ('impar', '17'), ('par', '18'), ('impar', '19'), ('par', '20'), ('impar', '21'), ('par', '22'), ('impar', '23'), ('par', '24'), ('impar', '25'), ('par', '26'), ('impar', '27'), ('par', '28'), ('impar', '29'), ('par', '30'), ('impar', '31'), ('par', '32'), ('impar', '33'), ('par', '34'), ('impar', '35'), ('par', '36'), ('impar', '37'), ('par', '38'), ('impar', '39'), ('par', '40'), ('impar', '41'), ('par', '42'), ('impar', '43'), ('par', '44'), ('impar', '45'), ('par', '46'), ('impar', '47'), ('par', '48'), ('impar', '49'), ('par', '50'), ('impar', '51'), ('par', '52'), ('impar', '53'), ('par', '54'), ('impar', '55'), ('par', '56'), ('impar', '57'), ('par', '58'), ('impar', '59'), ('par', '60'), ('impar', '61'), ('par', '62'), ('impar', '63'),

## Transformación groupBy (RDD->PairRDD)

In [98]:
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
result = rdd.groupBy(lambda x: x % 2)
result.collect()

[(0, <pyspark.resultiterable.ResultIterable object at 0x7f92b12d8198>), (1, <pyspark.resultiterable.ResultIterable object at 0x7f92b12d84e0>)]

**Ejercicio:**  
Como has visto, groupBy devuelve los values como iterables, utiliza map para que sean una lista

In [99]:
result.map(lambda pair: (pair[0], list(pair[1]))).collect()

[(0, [2, 8]), (1, [1, 1, 3, 5])]

**Ejercicio:**  
A partir del rdd generado de los 100 números. Agrupar los números 
según sean multiplos sólo de 3, sólo de 2, de ambos o de ninguno  
Pista: Usar una función para simplificar la sentencia

In [107]:
def multiplos(numero):
    if numero%2 == 0:
        if numero%3!=0:
            return('multiplo de 2')
        else:
            return('multiplo de 2 y 3')
    else:
        if numero%3!=0:
            return('multiplo de ninguno')
        else:
            return('multiplo de 3')


rdd_primeros_100_numeros.groupBy(multiplos).map(lambda pair: (pair[0],list(pair[1]))).collect()

[('multiplo de 2 y 3', [6, 12, 18, 24, 30, 36, 42, 48, 54, 60, 66, 72, 78, 84, 90, 96]), ('multiplo de ninguno', [1, 5, 7, 11, 13, 17, 19, 23, 25, 29, 31, 35, 37, 41, 43, 47, 49, 53, 55, 59, 61, 65, 67, 71, 73, 77, 79, 83, 85, 89, 91, 95, 97]), ('multiplo de 3', [3, 9, 15, 21, 27, 33, 39, 45, 51, 57, 63, 69, 75, 81, 87, 93, 99]), ('multiplo de 2', [2, 4, 8, 10, 14, 16, 20, 22, 26, 28, 32, 34, 38, 40, 44, 46, 50, 52, 56, 58, 62, 64, 68, 70, 74, 76, 80, 82, 86, 88, 92, 94, 98])]

**Ejercicio:**  
- Listar las palabras del texto 'Viajes por España' que tienen más de 5 letras

In [120]:
rdd_viajes_espana.flatMap(lambda linea: linea.split()).map(lambda palabra: re.sub('[^\w]', '', palabra.lower()))\
.filter(lambda palabra: len(palabra)> 5).collect()

['jinete', 'condición', 'contáis', 'además', 'cuatro', 'treinta', 'tenéis', 'último', 'navalmoral', 'conocido', 'proporcione', 'caballo', 'podéis', 'facilísimamente', 'primer', 'ofrecerá', 'reunidos', 'múltiples', 'exploración', 'geográficopintoresca', 'interés', 'excursión', 'historial', 'artística', 'religiosa', 'complacencia', 'aquellas', 'romerías', 'verdaderamente', 'patrióticas', 'cumplido', 'ufanan', 'alegran', 'todavía', 'respetan', 'tierra', 'podéis', 'visitar', 'monasterio', 'suponemos', 'estáis', 'madrid', 'empezaréis', 'billete', 'berlina', 'interior', 'navalmoral', 'diligencia', 'cáceres', 'diariamente', 'correo', 'carretera', 'general', 'ningún', 'paraje', 'peligrosa', 'pasaréis', 'sucesivamente', 'dehesa', 'carabancheles', 'artilleros', 'tenían', 'establecida', 'notable', 'escuela', 'práctica', 'ventas', 'alcorcón', 'alcorcón', 'dijéramos', 'sèvres', 'actuales', 'madrileños', 'móstoles', 'acordaréis', 'órgano', 'célebre', 'alcalde', 'navalcarnero', 'principales', 'lagare

- Agrupar las palabras del texto 'Viajes por España' según si tienen más de 5 letras o no

In [119]:
def longitud_palabras(palabra):
    if len(palabra)>5:
        return 'mayor de 5 letras'
    elif len(palabra)<5:
        return 'menor que 5 letras'
    else:
        return '5 letras'

rdd_viajes_espana.flatMap(lambda linea: linea.split()).map(lambda palabra: re.sub('[^\w]', '', palabra.lower()))\
.groupBy(lambda palabra: longitud_palabras(palabra)).map(lambda pair: (pair[0], list(pair[1]))).collect()

[('5 letras', ['duros', 'sobra', 'algún', 'hacer', 'viaje', 'orden', 'goces', 'grave', 'deber', 'sobre', 'yuste', 'tomar', 'hasta', 'calle', 'corte', 'siete', 'media', 'tarde', 'buena', 'donde', 'mismo', 'donde', 'tiene', 'valle', 'ocupa', 'santa', 'santa', 'alvar', 'gómez', 'ambos', 'otoño', 'reina', 'dicho', 'visto', 'causa', 'noche', 'haber', 'sueño', 'decía', 'entre', 'hecho', 'donde', 'coche', 'cosas']), ('mayor de 5 letras', ['jinete', 'condición', 'contáis', 'además', 'cuatro', 'treinta', 'tenéis', 'último', 'navalmoral', 'conocido', 'proporcione', 'caballo', 'podéis', 'facilísimamente', 'primer', 'ofrecerá', 'reunidos', 'múltiples', 'exploración', 'geográficopintoresca', 'interés', 'excursión', 'historial', 'artística', 'religiosa', 'complacencia', 'aquellas', 'romerías', 'verdaderamente', 'patrióticas', 'cumplido', 'ufanan', 'alegran', 'todavía', 'respetan', 'tierra', 'podéis', 'visitar', 'monasterio', 'suponemos', 'estáis', 'madrid', 'empezaréis', 'billete', 'berlina', 'inter

- Intenta realizar este ejercicio repitiendo el menor código posible

In [122]:
rdd_palabras_alfanum_viajes_espana = rdd_viajes_espana.flatMap(lambda linea: linea.split())\
.map(lambda palabra: re.sub('[^\w]', '', palabra.lower()))
print('Primer apartado:\n')
print(rdd_palabras_alfanum_viajes_espana.filter(lambda palabra: len(palabra)> 5).collect())
print('\nSegundo apartado:\n')
print(rdd_palabras_alfanum_viajes_espana.groupBy(lambda palabra: longitud_palabras(palabra))\
      .map(lambda pair: (pair[0], list(pair[1]))).collect())

Primer apartado:

['jinete', 'condición', 'contáis', 'además', 'cuatro', 'treinta', 'tenéis', 'último', 'navalmoral', 'conocido', 'proporcione', 'caballo', 'podéis', 'facilísimamente', 'primer', 'ofrecerá', 'reunidos', 'múltiples', 'exploración', 'geográficopintoresca', 'interés', 'excursión', 'historial', 'artística', 'religiosa', 'complacencia', 'aquellas', 'romerías', 'verdaderamente', 'patrióticas', 'cumplido', 'ufanan', 'alegran', 'todavía', 'respetan', 'tierra', 'podéis', 'visitar', 'monasterio', 'suponemos', 'estáis', 'madrid', 'empezaréis', 'billete', 'berlina', 'interior', 'navalmoral', 'diligencia', 'cáceres', 'diariamente', 'correo', 'carretera', 'general', 'ningún', 'paraje', 'peligrosa', 'pasaréis', 'sucesivamente', 'dehesa', 'carabancheles', 'artilleros', 'tenían', 'establecida', 'notable', 'escuela', 'práctica', 'ventas', 'alcorcón', 'alcorcón', 'dijéramos', 'sèvres', 'actuales', 'madrileños', 'móstoles', 'acordaréis', 'órgano', 'célebre', 'alcalde', 'navalcarnero', 'pri

## Transformación ReduceByKey (K,V)=>(K,V)

In [123]:
rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)])
rdd2 = rdd.reduceByKey(lambda e, i: e+i)
rdd2.collect()


[(0, 13), (2, 6), (1, 3), (3, 16)]

In [147]:
PAIS=0
MEDALLAS=1
DEPORTE=2

rdd=sc.parallelize([

('España', 1, 'Snowboard'),
('Canada', 2, 'Ski'),
('España', 1, 'Patinaje'),
('Canada', 1, 'Biatlón'),
('USA', 1, 'Salto'),
('Canada', 1, 'Bobsleigh'),
        
])
pair_rdd=rdd.map(lambda terna: (terna[0], terna))
pair_rdd.collect()

[('España', ('España', 1, 'Snowboard')), ('Canada', ('Canada', 2, 'Ski')), ('España', ('España', 1, 'Patinaje')), ('Canada', ('Canada', 1, 'Biatlón')), ('USA', ('USA', 1, 'Salto')), ('Canada', ('Canada', 1, 'Bobsleigh'))]

**Ejemplo**: queremos tener en una manera de relacionar país-->num_medallas

In [148]:
import pprint
pprint.pprint(pair_rdd.reduceByKey(lambda value1, value2: value1+value2).collect())
# Para cada país se concatenan sus tuplas - es un ejemplo, pero no es algo que típicamente necesitemos

[('USA', ('USA', 1, 'Salto')),
 ('España', ('España', 1, 'Snowboard', 'España', 1, 'Patinaje')),
 ('Canada',
  ('Canada', 2, 'Ski', 'Canada', 1, 'Biatlón', 'Canada', 1, 'Bobsleigh'))]

In [149]:
pprint.pprint(pair_rdd.reduceByKey(lambda value1, value2: value1[MEDALLAS]+value2[MEDALLAS]).collect())
# No funciona ya que sumamos ints con tuplas

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 165.0 failed 4 times, most recent failure: Lost task 1.3 in stage 165.0 (TID 415, 10.0.0.7, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 174, in main
    process()
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/current/spark2-client/python/pyspark/rdd.py", line 345, in func
    return f(iterator)
  File "/usr/hdp/current/spark2-client/python/pyspark/rdd.py", line 1835, in _mergeCombiners
    merger.mergeCombiners(iterator)
  File "/usr/hdp/current/spark2-client/python/pyspark/shuffle.py", line 272, in mergeCombiners
    d[k] = comb(d[k], v) if k in d else v
  File "<stdin>", lin

In [150]:
#recordamos el 
c = pair_rdd.collect()
for i in c:
    print(i)

('España', ('España', 1, 'Snowboard'))
('Canada', ('Canada', 2, 'Ski'))
('España', ('España', 1, 'Patinaje'))
('Canada', ('Canada', 1, 'Biatlón'))
('USA', ('USA', 1, 'Salto'))
('Canada', ('Canada', 1, 'Bobsleigh'))

In [151]:
pair_rdd2=pair_rdd.map(lambda pair: (pair[0], pair[1][MEDALLAS]))
pprint.pprint(pair_rdd2.collect())
# Modificamos el pair_rdd para que admita nil-ops

[('España', 1),
 ('Canada', 2),
 ('España', 1),
 ('Canada', 1),
 ('USA', 1),
 ('Canada', 1)]

In [152]:
pair_rdd2.reduceByKey(lambda value1, value2: value1+value2).collect()
# Y ahora sí que tenemos la suma del medallero

[('USA', 1), ('España', 2), ('Canada', 4)]

In [153]:
pair_rdd2.reduceByKey(lambda value1, value2: value1+value2).map(lambda x: {x[0]:x[1]}).collect()
#La cambiamos para que sean diccionarios, en vez de tuplas

[{'USA': 1}, {'España': 2}, {'Canada': 4}]

In [154]:
def combina_diccionarios(dict1,dict2):
    '''
    >>> newdict=combina_diccionarios({'k1':'v1'},{'k2':'v2'})
    {'k1': 'v1', 'k2': 'v2'}
    '''
    return {**dict1, **dict2}

In [155]:
pair_rdd2.reduceByKey(lambda value1, value2: value1+value2).map(lambda x: {x[0]:x[1]}).reduce(combina_diccionarios)

{'USA': 1, 'España': 2, 'Canada': 4}

## Transformación AggregateByKey (K,V) => (K,U)

In [156]:
pair_rdd.collect()
#Recordemos

[('España', ('España', 1, 'Snowboard')), ('Canada', ('Canada', 2, 'Ski')), ('España', ('España', 1, 'Patinaje')), ('Canada', ('Canada', 1, 'Biatlón')), ('USA', ('USA', 1, 'Salto')), ('Canada', ('Canada', 1, 'Bobsleigh'))]

In [165]:
pair_rdd.aggregateByKey(
    zeroValue=0,                                    # Valor inicial
    seqFunc=lambda acc,rdd: acc+rdd[MEDALLAS],      # Cómo añadir una rdd al acumulador?
    combFunc=lambda x,y: x+y                        # Cómo agrupar el resultado de todas las seqFunc(si hay más de una)
).collect()            

[('USA', 1), ('España', 2), ('Canada', 4)]

In [166]:
pair_rdd.aggregateByKey(
    zeroValue="",                                            # Valor inicial
    seqFunc=lambda acc,rdd: acc+","+rdd[DEPORTE],              # Cómo añadir una rdd al acumulador?
    combFunc=lambda x,y: x+  ","  +  y  # Cómo agrupar el resultado de todas las seqFunc (si hay mas de una)
).collect()                                                  

[('USA', ',Salto'), ('España', ',Snowboard,Patinaje'), ('Canada', ',Ski,,Biatlón,Bobsleigh')]

## Transformación CombineByKey (Un _AggregateByKey_ avanzado)

In [169]:
pair_rdd.combineByKey(
    createCombiner=lambda v: v[DEPORTE],              # Dada una RDD crear un valor a partir de ella
    mergeValue=lambda c,v: v[DEPORTE]+". "+c,         # Dada una RDD agregar su valor al valor ya calculado
    mergeCombiners=lambda c1,c2: c1+"; "+c2           # Agregar dos valores calculados
).collect()   

[('USA', 'Salto'), ('España', 'Patinaje. Snowboard'), ('Canada', 'Ski; Bobsleigh. Biatlón')]

## Transformación join

In [15]:
rdd1 = sc.parallelize([('k1','a'),('k1','b'),('k1','c'),('k2','d'),('k2','e'),('k3','f'),('k3','g')])
rdd2 = sc.parallelize([('k1','o'),('k1','p'),('k2','q'),('k2','r'),('k2','s'),('k5','t')])

rdd1.join(rdd2).collect()

[('k2', ('d', 's')), ('k2', ('d', 'r')), ('k2', ('d', 'q')), ('k2', ('e', 's')), ('k2', ('e', 'r')), ('k2', ('e', 'q')), ('k1', ('a', 'o')), ('k1', ('a', 'p')), ('k1', ('b', 'o')), ('k1', ('b', 'p')), ('k1', ('c', 'o')), ('k1', ('c', 'p'))]

**Ejercicio:**
Tenemos:
- 2 equipos de futbol senior de Baleares: ”Ibiza FC”, ”Mallorca FC”
- 1 equipo de futbol junior de Baleares: "Menorca Junior FC"
- 2 equipos de futbol senior de Canarias: "Tenerife FC", "Gomera FC"
- 2 equipos de futbol junior de Canarias: "Fuerteventura Junior FC" y "Lanzarote Junior FC"

Queremos hacer una competición de tal manera que todos los equipos senior de un archipiélago se enfrenten con todos los equipos senior del otro archipielago, y análogamente para los equipos junior


In [171]:
rdd_b=sc.parallelize([("Senior","Ibiza FC"), ("Senior", "Mallorca FC"), ("Junior", "Menorca Junior FC")])
rdd_c=sc.parallelize([("Senior","Tenerife FC"), ("Senior", "Gomera FC"), ("Junior", "Fuerteventura Junior FC"),("Junior", "Lanzarote Junior FC")])                      

rdd_b.join(rdd_c).map(lambda cruce: cruce[1]).collect()

[('Ibiza FC', 'Tenerife FC'), ('Ibiza FC', 'Gomera FC'), ('Mallorca FC', 'Tenerife FC'), ('Mallorca FC', 'Gomera FC'), ('Menorca Junior FC', 'Fuerteventura Junior FC'), ('Menorca Junior FC', 'Lanzarote Junior FC')]

# Un ejemplo clásico, el conteo de palabras

In [177]:
path = "adl://barcelodatalake.azuredatalakestore.net/pruebas/Nacho/quijote-2parrafos.txt"
lines = sc.textFile(path)
counts = lines.flatMap(lambda x: x.split(' ')).map(lambda w: ''.join([c for c in w if c.isalnum()])).map(lambda palabra: (palabra, 1)).reduceByKey(lambda x,y:x+y)

output=counts.collect()
output = sorted(output, key=lambda o:o[1], reverse=True)
for palabra,conteo in output:

    print(palabra,"-->",conteo)



de --> 38
que --> 21
y --> 16
los --> 13
la --> 12
en --> 8
su --> 7
a --> 7
con --> 6
se --> 6
más --> 5
razón --> 4
el --> 4
no --> 4
un --> 4
las --> 4
hidalgo --> 3
esto --> 3
vuestra --> 3
leer --> 3
para --> 2
todos --> 2
libros --> 2
como --> 2
caza --> 2
le --> 2
hacienda --> 2
muchas --> 2
caballerías --> 2
os --> 2
del --> 2
mi --> 2
punto --> 2
lo --> 2
así --> 2
parecían --> 2
Y --> 2
una --> 2
nuestro --> 2
rocín --> 2
llegaba --> 2
cuando --> 2
casa --> 2
partes --> 2
 --> 1
también --> 1
grandeza --> 1
llegó --> 1
este --> 1
añadidura --> 1
ratos --> 1
Frisaba --> 1
seco --> 1
enjuto --> 1
Mancha --> 1
años --> 1
ningunos --> 1
pues --> 1
resto --> 1
enflaquece --> 1
hanegas --> 1
perlas --> 1
velludo --> 1
comprar --> 1
entresemana --> 1
Pero --> 1
carnes --> 1
honraba --> 1
merecedora --> 1
vivía --> 1
deste --> 1
requiebros --> 1
antigua --> 1
cielos --> 1
Quesada --> 1
daba --> 1
algún --> 1
hallaba --> 1
altos --> 1
curiosidad --> 1
estaba --> 1
mozo --> 1
saber -->

**Ejercicio:**
Mejora el ejemplo anterior, siguiendo buenas prácticas de programación (por ejemplo evitando las lambdas)

In [185]:
path = "adl://barcelodatalake.azuredatalakestore.net/pruebas/Nacho/quijote-2parrafos.txt"
lines = sc.textFile(path)

def separador_y_limpieza(linea):
    palabras = linea.split()
    palabras = [c for c in palabras if c.isalnum()]
    return palabras

counts = lines.flatMap(lambda linea: separador_y_limpieza(linea)).map(lambda palabra: (palabra, 1)).reduceByKey(lambda x,y:x+y)

output=counts.collect()
output = sorted(output, key=lambda o:o[1], reverse=True)
for palabra,conteo in output:

    print(palabra,"-->",conteo)

de --> 38
que --> 21
y --> 16
los --> 12
la --> 12
en --> 8
su --> 7
a --> 7
con --> 6
se --> 6
más --> 5
no --> 4
razón --> 4
el --> 4
un --> 4
las --> 4
vuestra --> 3
hidalgo --> 2
para --> 2
os --> 2
esto --> 2
libros --> 2
como --> 2
le --> 2
muchas --> 2
del --> 2
mi --> 2
punto --> 2
lo --> 2
parecían --> 2
casa --> 2
partes --> 2
nuestro --> 2
Y --> 2
una --> 2
cuando --> 2
leer --> 2
llegaba --> 2
rocín --> 2
también --> 1
llegó --> 1
ningunos --> 1
alguna --> 1
concluían --> 1
Frisaba --> 1
divinamente --> 1
añadidura --> 1
enjuto --> 1
diferencia --> 1
suyas --> 1
resto --> 1
todos --> 1
ratos --> 1
hanegas --> 1
velludo --> 1
entresemana --> 1
Pero --> 1
merecedora --> 1
vivía --> 1
deste --> 1
aun --> 1
estaba --> 1
daba --> 1
caballerías --> 1
algún --> 1
hallaba --> 1
altos --> 1
cielos --> 1
mozo --> 1
desatino --> 1
requiebros --> 1
Una --> 1
hacen --> 1
saber --> 1
tres --> 1
gran --> 1
administración --> 1
hay --> 1
cuantos --> 1
razones --> 1
vaca --> 1
honraba --> 1

# Un último ejercicio

Obtén un un rdd de las palabras comunes a los textos "El Quijote" y "Viajes por España" que cumplan las siguientes condiciones:
* que tengan más de 4 letras  

El rdd ha de estar compuesto por diccionarios, y cada diccionario ha de tener
* La palabra
* Cuantas veces aparece en "El Quijote"
* Cuantas veces aparece en "Viajes por España"
* Cuantas veces aparece en ambos (es decir, la suma de los dos puntos anteriores)

In [193]:
def separador_y_limpieza(linea):
    palabras = linea.split()
    palabras = [c for c in palabras if c.isalnum()]
    return palabras


qpath = "adl://barcelodatalake.azuredatalakestore.net/pruebas/Nacho/quijote-2parrafos.txt"
qlines = sc.textFile(qpath)

qcounts = qlines.flatMap(lambda x: separador_y_limpieza(x)).map(lambda palabra: (palabra, 1)).reduceByKey(lambda x,y:x+y)

vpath = "adl://barcelodatalake.azuredatalakestore.net/pruebas/formacion_abril_2018/viajes_por_españa.txt"
vlines = sc.textFile(vpath)

vcounts = vlines.flatMap(lambda x: separador_y_limpieza(x)).map(lambda palabra: (palabra, 1)).reduceByKey(lambda x,y:x+y)

qcounts.join(vcounts).map(lambda t: {'Palabra':t[0], 'Q':t[1][0], 'V':t[1][1], 'Suma':t[1][0]+t[1][1]}).collect()


[{'V': 1, 'Q': 2, 'Suma': 3, 'Palabra': 'para'}, {'V': 6, 'Q': 2, 'Suma': 8, 'Palabra': 'os'}, {'V': 1, 'Q': 1, 'Suma': 2, 'Palabra': 'algún'}, {'V': 3, 'Q': 2, 'Suma': 5, 'Palabra': 'como'}, {'V': 6, 'Q': 12, 'Suma': 18, 'Palabra': 'los'}, {'V': 21, 'Q': 16, 'Suma': 37, 'Palabra': 'y'}, {'V': 1, 'Q': 1, 'Suma': 2, 'Palabra': 'nombre'}, {'V': 3, 'Q': 7, 'Suma': 10, 'Palabra': 'su'}, {'V': 3, 'Q': 6, 'Suma': 9, 'Palabra': 'con'}, {'V': 5, 'Q': 2, 'Suma': 7, 'Palabra': 'del'}, {'V': 1, 'Q': 3, 'Suma': 4, 'Palabra': 'vuestra'}, {'V': 7, 'Q': 4, 'Suma': 11, 'Palabra': 'el'}, {'V': 2, 'Q': 4, 'Suma': 6, 'Palabra': 'no'}, {'V': 1, 'Q': 1, 'Suma': 2, 'Palabra': 'bien'}, {'V': 16, 'Q': 8, 'Suma': 24, 'Palabra': 'en'}, {'V': 12, 'Q': 21, 'Suma': 33, 'Palabra': 'que'}, {'V': 1, 'Q': 2, 'Suma': 3, 'Palabra': 'muchas'}, {'V': 1, 'Q': 1, 'Suma': 2, 'Palabra': 'este'}, {'V': 3, 'Q': 2, 'Suma': 5, 'Palabra': 'lo'}, {'V': 1, 'Q': 1, 'Suma': 2, 'Palabra': 'haber'}, {'V': 1, 'Q': 1, 'Suma': 2, 'Palabra'