![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png)  ![Python Logo](http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png)
# PEC2: Noviembre 2023

# Contando palabras: Construye una aplicación que cuente palabras de forma eficiente

Este laboratorio usara las tecnologías descritas en los materiales del curso sobre Spark para desarrollar una aplicación de conteo de palabras. 

Con el uso masivo de Internet y las redes sociales, el volumen de texto no estructurado esta creciendo dramáticamente, y Spark es una gran herramienta para analizar este tipo de datos. En esta PEC, vamos a escribir Código para encontrar las palabras mas comunes en un texto generado en latin, el ya conocido [Lorem Ipsum](https://www.lipsum.com/).


Lo más interesante de la forma de trabajar en esta práctica es que podría escalarse para, por ejemplo, encontrar las palabras más comunes en Wikipedia.

## Durante esta PEC vamos a cubrir:

* *Parte 1:* Creación de un RDD y un pair RDD - **1.5 PUNTOS**
* *Parte 2:* Contar palabras usando un pair RDD - **2.5 PUNTOS**
* *Parte 3:* Encontrar las palabras individuales y su frecuencia de aparición media - **1 PUNTO**
* *Parte 4:* Aplicar las funcionalidades desarrolladas a un archivo de texto* - **3 PUNTOS**
* *Parte 5:* Calcular algunos estadísticos* - **2 PUNTOS**


> Como referencia a todos los detalles de los métodos que se usan en esta práctica usar:
> * [API Python de Spark](https://spark.apache.org/docs/2.4.0/api/python/pyspark.html#pyspark.RDD)

## Formato de entrega:

Para realizar la entrega, debéis subir los dos notebooks al apartado de evaluación, con el siguiente nombre:

* usuarioUOC_22.519_PEC1_WordCount_ES.ipynb



## Parte 1: Creacion de un RDD y un pair RDDs

En esta sección, exploraremos como crear RRDs usando `parallelize` y como aplicar pair RDDs al problema del conteo de palabras.

### (0) Configuración del entorno python + spark

In [82]:
import findspark
findspark.init()
import pyspark
import random
sc = pyspark.SparkContext(master="local[1]", appName="PAC1_emadariagal")

### (1a) Creación de un RDD
Empecemos generando un RDD a partir de una lista de Python y el método `sc.parallelize`. Luego mostraremos por pantalla el tipo de la variable generada.

In [83]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
# Print out the type of wordsRDD
type(wordsRDD)

pyspark.rdd.RDD

In [84]:
# Inspeccionar SparkContent
print(sc.version) # Recuperar SparkContent Version
print(sc.pythonVer) # Recuperar Python Version
print(sc.master) # URL del clúster o del modo de ejecución local utilizado para crear el SparkContent.
print(wordsRDD.getNumPartitions()) # Numero de particiones en el RDD wordsRDD.
print(wordsRDD.glom().collect())# Visualizar contenido de cada particion de un RDD.

2.4.0-cdh6.2.0
3.5
local[1]
4
[['cat'], ['elephant'], ['rat'], ['rat', 'cat']]


### (1b) Crear el plural de las palabras y testear

Vamos a utilizar una transformación `map()` para incorporar la letra 's' a cada uno de los strings almacenados en el RDD que acabamos de crear. Vamos a definir una función de Python que devuelva una palabra, que se le ha pasado como parámetro, incorporando una "s" al final de la misma. Reemplazar el texto `<FILL IN>` con la solución propuesta. después de haber definido correctamente la función `makePlural`, ejecutar la segunda celda que contiene un assert de test. Si la solución es correcta, se imprimira `1 test passed`.

Esta será la forma habitual de trabajar en las PECs. Los ejercicios contendrán una explicación de lo que se espera, seguido de una celda de Código con uno o mas `<FILL IN>`. Las celdas que necesiten ser modificadas contendrán el texto `# TODO: Replace <FILL IN> with appropriate code` en la primera línea.

Una vez se hayan sustituido todos los `<FILL IN>` por el Código Python adecuado, ejecutar la celda, y posteriormente ejecutar la celda siguiente de test para comprobar que la solución es la esperada.


In [85]:
# definición funcion makePlural()
def makePlural(word):
    """Adds an 's' to `word`.

    Note:
        This is a simple function that only adds an 's'.  

    Args:
        word (str): A string.

    Returns:
        str: A string with 's' added to it.
    """
    #return word +'s'
    return word +'s'

makePlural('cat')

'cats'

In [86]:
# TEST Pluralize and test (1b)
assert makePlural('rat') == 'rats', 'incorrect result: makePlural does not add an s' 

### (1c) Aplicar `makePlural` a nuestro RDD

Ahora es el momento de aplicar nuestra función `makePlural()` a todos los elementos del RDD usando una transformación [map()](https://spark.apache.org/docs/2.4.0/api/python/pyspark.html#pyspark.RDD.map). Posteriormente ejecutar la acción [collect()](http://spark.apache.org/docs/2.4.0/api/python/pyspark.html#pyspark.RDD.collect) para obtener el RDD transformado.

In [87]:
# Aplicar la función makePlural() a todos los elementos del RDD usando una transformación map().
pluralRDD = wordsRDD.map(lambda x: makePlural(x))
# .collect() devuelve todos los elementos del conjunto de datos RDD como un array.
pluralRDD.collect() 

['cats', 'elephants', 'rats', 'rats', 'cats']

In [88]:
# TEST Apply makePlural to the base RDD(1c)
assert pluralRDD.collect() == ['cats', 'elephants', 'rats', 'rats', 'cats'], 'incorrect values for pluralRDD'

### (1d) Ejecutar una funcion `lambda` en un `map`

Vamos a crear el mismo RDD usando una `lambda` function en lugar de una función con nombre.

In [89]:
# Usando una transformación map conjuntamente con una función lambda aplicamos directamente el plural a cada elemento del RDD sin necesidad de usar la funcion makePlural().
pluralLambdaRDD = wordsRDD.map(lambda x: x +'s')
print(pluralLambdaRDD.collect())

['cats', 'elephants', 'rats', 'rats', 'cats']


In [90]:
# TEST Pass a lambda function to map (1d)
assert pluralLambdaRDD.collect() == ['cats', 'elephants', 'rats', 'rats', 'cats'], 'incorrect values for pluralLambdaRDD (1d)'

### (1e) Numero de caracteres de cada una de las palabras

Ahora vamos a usar un `map()` y una función lambda `lambda` para obtener el número de caracteres de cada palabra. Usaremos `collect` para guardar este resultado directamente en una variable.

In [91]:
# Obtener el número de caracteres de cada palabra del RDD
pluralLengths = (pluralRDD.map(lambda x: len(x)).collect())
print(pluralLengths)

[4, 9, 4, 4, 4]


In [92]:
# TEST Length of each word (1e)
assert pluralLengths == [4, 9, 4, 4, 4], 'incorrect values for pluralLengths'

### (1f) Pair RDDs

El siguiente paso para completar nuestro programa de conteo de palabras en crear un nuevo tipo de RDD, llamado pair RDD. Un pair RDD es un RDD donde cada elemento es un tupla del estilo `(k, v)` donde `k` es la clave y `v` es su valor correspondiente. En este ejemplo, crearemos una pair RDD consistente en tuplas con el formato `('<word>', 1)` para cada elemento de nuestro RDD básico.

Podemos crear nuestro pair RDD usando una transformación `map()` con una `lambda()` function que cree un nuevo RDD.


In [93]:
# Crear pair RDD consistente en tuplas con el formato ('<word>', 1) para cada elemento de nuestro RDD básico.
wordPairs = wordsRDD.map(lambda w : (w, 1))
print(wordPairs.collect())

[('cat', 1), ('elephant', 1), ('rat', 1), ('rat', 1), ('cat', 1)]


In [94]:
# TEST Pair RDDs (1f)
assert wordPairs.collect() == [('cat', 1), ('elephant', 1), ('rat', 1), ('rat', 1), ('cat', 1)], 'incorrect value for wordPairs'

## Parte 2: Contar palabras usando un pair RDD

Ahora, contaremos el número de veces que una palabra en particular aparece en el RDD. Esta operación se puede realizar de una infinidad de maneras, pero algunas serán mucho menos eficientes que otras.

Un solución muy sencilla seria usar `collect()` sobre todos los elementos devolverlos al driver y allí contarlos. Mientras esta forma de trabajar podría funcionar con textos relativamente cortos, nosotros lo que queremos es poder trabajar con textos de cualquier longitud. Adicionalmente, ejecutar todo el cálculo en el driver es mucho más lento que ejecutarlo en paralelo en los workers. Por estos motivos, en esta práctica usaremos operaciones paralelizables.


### (2a) Usando `groupByKey()`
Una primera solución a nuestro problema, luego veremos que hay otras mucho más eficientes, se podría basar en la transformación [groupByKey()](http://spark.apache.org/docs/2.4.0/api/python/pyspark.html#pyspark.RDD.groupByKey). Como su nombre indica, la transformación `groupByKey()` agrupa todos los elementos de un RDD que compartan la misma clave en una única lista dentro de una de las particiones.

Esta operación plantea dos problemas:
  + Esta operación necesita mover todos los valores dentro de la partición adecuada. Esto satura la red. 
  + Las listas generadas pueden llegar a ser muy grandes llegando incluso a saturar la memoria de alguno de los trabajadores
  
Utiliza `groupByKey()` para generar un pair RDD del tipo `('word', iterator)`.


In [95]:
# Aplicar groupByKey() para generar un pair RDD del tipo ('word', iterator).
# Note that groupByKey requires no parameters
wordsGrouped = wordPairs.groupByKey()
for key, value in wordsGrouped.collect():
    print('{0}: {1}'.format(key, list(value)))

cat: [1, 1]
elephant: [1]
rat: [1, 1]


In [96]:
# TEST groupByKey() approach (2a)
assert sorted(wordsGrouped.mapValues(lambda x: list(x)).collect()) == [('cat', [1, 1]), ('elephant', [1]), ('rat', [1, 1])], 'incorrect value for wordsGrouped'

### (2b) Utiliza `groupByKey()` para obtener los conteos

Usando la transformación `groupByKey()` crea un RDD que contenga 2 elementos, donde cada uno de ellos sea un par palabra (clave) iterador de Python (valor).

Luego suma todos los valores de iterador usando una transformación `map()`. El resultado debe ser un pair RDD que contenga las parejas (word, count).


In [97]:
# Suma de todos los valores de iterador usando la transformacion map() y aplicando la funcion lambda para obtener la estructura (word, count).
wordCountsGrouped = wordsGrouped.map(lambda x: (x[0], sum(x[1])))
print(wordCountsGrouped.collect())

[('cat', 2), ('elephant', 1), ('rat', 2)]


In [98]:
# TEST Use groupByKey() to obtain the counts (2b)
assert sorted(wordCountsGrouped.collect())==[('cat', 2), ('elephant', 1), ('rat', 2)],'incorrect value for wordCountsGrouped'

### (2c) Conteo usando `reduceByKey`

Una mejor solución es comenzar desde un pair RDD y luego usar la transformación [reduceByKey()](http://spark.apache.org/docs/2.4.0/api/python/pyspark.html#pyspark.RDD.reduceByKey) para crear un nuevo pair RDD. La transformación `reduceByKey()` agrupa todas las parejas que comparten la misma clave. Posteriormente aplica la funcion que se le pasa por parámetro agrupando los valores de dos en dos. Este proceso se repite iterativamente hasta que obtenemos un único valor agregado para cada una de las claves del pair RDD. `reduceByKey()` opera aplicando la función primero dentro de cada una de las particiones de forma independiente, y posteriormente únicamente comparte los valores agregados entre particiones diferentes, permitiéndole escalar de forma eficiente ya que no tiene necesidad de desplazar por la red una gran cantidad de datos.

In [99]:
# Usar la transformación reduceByKey() para crear un nuevo pair RDD  agrupando todas las parejas que comparten la misma clave , posteriormente aplica la funcion que se le pasa por parámetro.
# Note that reduceByKey takes in a function that accepts two values and returns a single value
wordCounts = wordPairs.reduceByKey(lambda x,y : x + y )
print (wordCounts.collect())

[('cat', 2), ('elephant', 1), ('rat', 2)]


In [100]:
# TEST Counting using reduceByKey (2c)
assert sorted(wordCounts.collect())==[('cat', 2), ('elephant', 1), ('rat', 2)],'incorrect value for wordCounts'

### (2d) Ahora todo junto

La versión mas compleja del Código ejecuta primero un `map()` sobre el pair RDD, la transformación `reduceByKey()`, y finalmente la acción `collect()` en una única línea de Código.

In [101]:
# Pipeline de los pasos en una línea de código.
wordCountsCollected = (wordsRDD
                       .map(lambda w : (w, 1))
                       .reduceByKey(lambda x,y : x + y)
                       .collect())
print(wordCountsCollected)
print(type(wordCountsCollected))

[('cat', 2), ('elephant', 1), ('rat', 2)]
<class 'list'>


In [102]:
# TEST All together (2d)
assert sorted(wordCountsCollected)==[('cat', 2), ('elephant', 1), ('rat', 2)],'incorrect value for wordCountsCollected'

## Parte 3: Encontrar las palabras individuales y su frecuencia de aparición media

### (3a) Palabras únicas

Calcular el número de palabras únicas en `wordsRDD`. Puedes utilizar otros RDDs que hayas creado en esta práctica si te resulta más sencillo.


In [103]:
# Cálculo del número de palabras únicas en wordsRDD.
# wordsRDD = [['cat'], ['elephant'], ['rat'], ['rat', 'cat']]
uniqueWords = len(wordsRDD
               .map(lambda w : (w, 1))
               .reduceByKey(lambda x,y : x + y)
               .collect())
print(uniqueWords)

3


In [104]:
# TEST Unique words (3a)
assert uniqueWords== 3, 'incorrect count of uniqueWords'

### (3b) Calcular la media usando `reduce()`

Encuentra la frecuencia media de aparición de palabras en `wordCounts`.

Utiliza la acción `reduce()` para sumar los conteos en `wordCounts` y entonces divide por el número de palabras únicas. Para realizar esto primero aplica un `map()` al pair RDD `wordCounts`, que está formado por tuplas con el formato (key, value), para convertirlo en un RDD de valores.


In [105]:
print(type(wordCounts))
print(wordCounts.take(3))

<class 'pyspark.rdd.PipelinedRDD'>
[('cat', 2), ('elephant', 1), ('rat', 2)]


In [106]:
# Para calcular la media se obtiene la suma de los valores asociados a cada una de las claves, y se divide por el número de palabras únicas calculadas en el paso anterior.
from operator import add
totalCount = (wordCounts
              .map(lambda x: x[1])
              .reduce(lambda x, y: x + y))

average = totalCount / float(uniqueWords)
print(totalCount)
print(round(average, 2))

5
1.67


In [107]:
# TEST Mean using reduce (3b)
assert round(average, 2)==1.67, 'incorrect value of average'

## Parte 4: Aplicar las funcionalidades desarrolladas a un archivo de texto

Para esto hemos de construir una función `wordCount`, capaz de trabajar con datos del mundo real que suelen presentan problemas como el uso de mayúsculas o minúsculas, puntuación, acentos, etc. Posteriormente, cargar los datos de nuestra fuente de datos y finalmente, calcular el conteo de palabras sobre los datos procesados.

### (4a) función `wordCount`

Primero, define una función para el conteo de palabras. deberías reusar las técnicas que has visto en los apartados anteriores de esta práctica. Dicha función, ha de tomar un RDD que contenga una lista de palabras, y devolver un pair RDD que contenga todas las palabras con sus correspondientes conteos.


In [108]:
def wordCount(wordListRDD):
    """Creates a pair RDD with word counts from an RDD of words.

    Args:
        wordListRDD (RDD of str): An RDD consisting of words.

    Returns:
        RDD of (str, int): An RDD consisting of (word, count) tuples.
    """
    return wordListRDD.map(lambda w : (w, 1)).reduceByKey(lambda x,y : x + y)
                           
print(wordCount(wordsRDD).collect())

[('cat', 2), ('elephant', 1), ('rat', 2)]


In [109]:
# TEST wordCount function (4a)
assert sorted(wordCount(wordsRDD).collect())==[('cat', 2), ('elephant', 1), ('rat', 2)],'incorrect definition for wordCount function'

### (4b) mayúsculas y puntuación

Los ficheros del mundo real son mucho más complejos que los que hemos estado usando en esta PAC. Algunos de los problemas que son necesarios de solucionar son:
  + Las palabras deben de contarse independientemente de si están en mayúscula o minúscula (por ejemplo, Spark y spark deberían contarse como la misma palabra).
  + Todos los signos de puntuación han de eliminarse.
  + Cualquier espacio al principio o al final de la palabra ha de eliminarse.
  
Define la función `removePunctuation` que convierta todo el texto a minúsculas, elimine los signos de puntuación, y elimine los espacios al principio y fin de cada palabra. Usa el módulo de Python [re](https://docs.python.org/2/library/re.html) para eliminar cualquier carácter que no sea una letra, un numero o un espacio.

Sino estas familiarizado con las expresiones regulares deberías revisar [este tutorial](https://developers.google.com/edu/python/regular-expressions). Alternativamente, [esta web](https://regex101.com/#python) es de gran ayuda para debugar tus expresiones regulares.

**Hints**

1. Usa la función [re.sub()](https://docs.python.org/2.7/library/re.html#re.sub).
2. Para nuestros propósitos, "puntuación" significa "no alfabético, numérico, o espacio." La expresión regular que define estos caracteres es: `[^A-Za-z\s\d]`
3. No usar `\W`, ya que retendrá los guiones bajos.


In [110]:
# re.sub() se reemplace cada carácter  distinto al patron [^A-Za-z\s\d], es decir, elementos de puntuación por cadena vacia ''.
# .lower() devuelve una cadena donde todos los caracteres están en minúsculas.
# .strip() elimina los espacios en blanco iniciales y finales.
import re
def removePunctuation(text):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.

    Note:
        Only whitespace, letters, and numbers should be retained.  Other characters should should be
        eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
        punctuation is removed.

    Args:
        text (str): A string.

    Returns:
        str: The cleaned up string.
    """
    return re.sub(r'[^A-Za-z\s\d]', '', text).lower().strip()
    
print(removePunctuation('Hi, you!'))
print(removePunctuation(' No under_score!'))
print(removePunctuation(' *      Remove punctuation then spaces  * '))

hi you
no underscore
remove punctuation then spaces


In [111]:
# TEST Capitalization and punctuation (4b)
assert removePunctuation(" The Elephant's 4 cats. ") == 'the elephants 4 cats', 'incorrect definition for removePunctuation function'

### (4c) Cargar un fichero de texto

Para la siguiente parte, usaremos el texto ya mencionado Lorem Ipsum generado para la práctica. Para convertir un fichero de texto en un RDD, usaremos el método `SparkContext.textFile()`. También usaremos la función que acabamos de crear `removePunctuation()` dentro de una transformación `map()` para eliminar todos los caracteres no alfabéticos, numéricos o espacios. Dado que el fichero es bastante grande, usaremos `take(15)`, de forma que tan solo imprimiremos por pantalla las 15 primeras líneas


In [112]:
# Tan solo ejecuta este codigo
import os.path

fileName = os.path.join('/aula_22.419/data/', 'LoremIpsum.txt')
loremRDD = sc.textFile(fileName, 8).map(removePunctuation).filter(lambda x: len(x)>0)
loremRDD.take(10)

['aut minima deleniti et autem minus illo esse dolores eligendi corrupti dolore minima nostrum eos nobis nam nihil aspernatur nam ut quae sint laborum ut dolores error possimus aperiam consequatur',
 'pariatur sed quo non itaque qui pariatur saepe ad quis consequatur nihil iste molestias et eos ut expedita vel reiciendis dolorem enim doloribus quam architecto aperiam',
 'sed repudiandae pariatur similique est aut sequi animi in aperiam enim ipsa enim dolorem inventore aut quo odio in consequatur et',
 'aspernatur ad esse et aliquid itaque dolores rerum quia commodi explicabo non magnam nostrum consectetur non sint eum nulla et aut quis doloribus itaque nulla molestiae quis est est quo facilis incidunt a ipsa in itaque sed aut nobis facere dignissimos atque unde cum ea vero',
 'tenetur vel quod voluptatum laudantium dolores neque aut est modi qui aperiam itaque aperiam quae ratione doloremque aut delectus quas qui',
 'qui placeat vel ipsam praesentium sint recusandae dicta minus praesen


### (4d) Extraer las palabras de las líneas

Antes de poder usar la función `wordcount()`, hemos de solucionar dos problemas con el formato del RDD:
  + El primer problema es que necesitamos dividir cada línea por sus espacios. ** Esto lo solucionaremos en el apartado (4d). **
  + El segundo problema es que necesitamos filtrar las líneas completamente vacías. ** Esto lo solucionaremos en el apartado (4e). **

Para aplicar una transformación que divida cada elemento del RDD por sus espacios, hemos de aplicar la función incorporada en los strings de Python [split()](https://docs.python.org/2/library/string.html#string.split). Cuidado que a primera vista puede parecer que la función necesaria es una transformación `map()`, pero si piensas un poco mas sobre el resultado de la función `split()` te darás cuenta que esta no es la opción correcta.

> Nota:
> * No uséis la implementación estándar del `split()`, debéis pasar un valor de separación. Por ejemplo, para dividir `line` por comas, usa `line.split(',')`.


In [113]:
# flatMap() Devuelve un nuevo RDD aplicando primero una función (lambda y a su vez, esta nos permite aplicar la funcion split(' ')) y continuación aplanar los resultados.
loremWordsRDD = loremRDD.flatMap(lambda x: x.split(' '))
loremWordsCount = loremWordsRDD.count()
print(loremWordsRDD.top(5))
print(loremWordsCount)

['voluptatum', 'voluptatum', 'voluptatum', 'voluptatum', 'voluptatum']
29249699


In [114]:
# TEST Words from lines (4d)
# This test allows for leading spaces to be removed either before or after
# punctuation is removed.
assert loremWordsCount == 29249699, 'incorrect value for loremWordsCount'
assert loremWordsRDD.top(5)==[u'voluptatum', u'voluptatum', u'voluptatum', u'voluptatum', u'voluptatum'], 'incorrect value for loremWordsRDD'

### (4e) Calcula palabras distintas

El siguiente paso es contar cuantas palabras distintas contiene nuestro texto. Puedes usar las transformaciones map() y reduceByKey() ya utilizadas anteriormente.

In [115]:
# La función .map(lambda w: (w, 1)) se utiliza para mapear cada palabra w en un par clave-valor (w, 1).  es decir, se asigna el valor 1 a cada palabra.
# La función .reduceByKey(lambda x, y: x + y) se utiliza para realizar una reducción por clave, es decir, va a ir sumando aquellos valores asociados a claves comunes.
distintWordsMapRDD = loremWordsRDD.map(lambda w: (w, 1)).reduceByKey(lambda x, y: x + y)

# .keys() se utiliza para obtener las claves de un RDD o un DataFrame.
# la función .distinct() se utiliza para eliminar los elementos duplicados de un RDD o un dataframes 
# Seleccionar claves unicas o lo que es lo mismo sin duplicados.
distintWordsRDD=distintWordsMapRDD.keys().distinct()

# Mostrar las 8 primeras claves del RDD
print(distintWordsRDD.take(8))  
# Contar el total de claves
print(distintWordsRDD.count())

['tempora', 'sapiente', 'vitae', 'nisi', 'quidem', 'consectetur', 'perferendis', 'debitis']
182


In [116]:
# TEST Remove empty elements (4e)
assert distintWordsRDD.count()== 182, 'incorrect value for shakeWordCount'

### (4f) Cuenta las palabras

Ahora que tenemos un RDD que contiene solo palabras. El siguiente paso es aplicar la función `wordCount()` para producir una lista con los conteos de palabras. Podemos ver las 15 más comunes usando la acción `takeOrdered()`; sin embargo, como los elementos del RRD son pares, necesitamos una función especial que ordene los pares de la forma correcta.

Usad las funciones  `wordCount()` y `takeOrdered()` para obtener las 15 palabras más comunes junto con sus conteos.


In [117]:
# La función .takeOrdered(num, key=None) permite obtener los N elementos de un RDD ordenados en orden ascendente o  descentente según lo especificado en el parametro Key Opcional.
# key = lambda x: -x[1] permite ordendar por el seundo elemento de la tupla y con el - invertir el orden , en este caso a descendente (de mayor a menor)
top15WordsAndCounts = wordCount(loremWordsRDD).takeOrdered(15, key = lambda x: -x[1])
print(top15WordsAndCounts)

[('et', 1291567), ('aut', 705322), ('ut', 704966), ('qui', 704703), ('est', 587782), ('voluptatem', 469634), ('quia', 469401), ('rerum', 353054), ('sed', 352873), ('omnis', 352653), ('consequatur', 352436), ('sit', 352396), ('non', 351959), ('voluptas', 351127), ('dolorem', 235822)]


In [118]:
# TEST Count the words (4f)
assert top15WordsAndCounts== [('et', 1291567), ('aut', 705322), 
                              ('ut', 704966), ('qui', 704703), 
                              ('est', 587782), ('voluptatem', 469634), 
                              ('quia', 469401), ('rerum', 353054), ('sed', 352873),
                              ('omnis', 352653), ('consequatur', 352436), 
                              ('sit', 352396), ('non', 351959), ('voluptas', 351127), 
                              ('dolorem', 235822)],'incorrect value for top15WordsAndCounts'

## Parte 5: Calcular algunos estadísticos

Usando las mismas técnicas que has aplicado en los ejercicios anteriores responde a las siguientes preguntas:


### (5a) De todas las palabras que tienen más de 6 letras ¿cuántas tienen exactamente una 'e' y una 'a'?


In [119]:
recuento_palabras = wordCount(loremWordsRDD)

In [133]:
recuento_palabras.take(2)

[('tempora', 117602), ('sapiente', 118045)]

In [120]:
def count_a_e(word):
    num_a = word.count('a') 
    num_e = word.count('e')
    
    if num_a == 1 and num_e == 1:
        return True
    else:
        return False

In [121]:
print(count_a_e('tempora'))
print(count_a_e('necessitatibus'))

True
False


In [122]:
recuento_palabras.map(lambda x: (x[0], len(x[0]))).filter(lambda x : x[1] > 6).filter(lambda x : count_a_e(x[0])).take(15)

[('tempora', 7),
 ('consequatur', 11),
 ('voluptatem', 10),
 ('ratione', 7),
 ('voluptate', 9),
 ('veritatis', 9),
 ('voluptates', 10),
 ('molestias', 9),
 ('cupiditate', 10),
 ('maiores', 7),
 ('explicabo', 9),
 ('architecto', 10),
 ('perspiciatis', 12)]

### (5b) ¿Cuál es la longitud media de todas las palabras diferentes que comienzan con 'c'?

In [None]:
totalCount = (wordCounts
              .map(lambda x: x[1])
              .reduce(lambda x, y: x + y))

average = totalCount / float(uniqueWords)
print(totalCount)
print(round(average, 2))

In [130]:
def comienzan_por_c(word):
    if word[0] == 'c':
        return True
    else:
        return False

In [131]:
print(comienzan_por_c('consequatur'))
print(comienzan_por_c('explicabo'))

True
False


In [169]:
recuento_palabras.filter(lambda x: comienzan_por_c(x[0])).map(lambda x: (x[0], len(x[0]))).map(lambda x: x[1]).take(15)

[11, 11, 12, 8, 6, 5, 8, 3, 10, 7]

In [175]:
unique_words = recuento_palabras.filter(lambda x: comienzan_por_c(x[0])).map(lambda x: (x[0], len(x[0]))).map(lambda x: x[1]).collect()
print(type(unique_words))
print(unique_words)

<class 'list'>
[11, 11, 12, 8, 6, 5, 8, 3, 10, 7]


In [159]:
# Listado de valores que representan la longitud de cada una de las palabra que comienzan por c.
totalCount = recuento_palabras.filter(lambda x: comienzan_por_c(x[0])).map(lambda x: (x[0], len(x[0]))).map(lambda x: x[1]).reduce(lambda x, y: x + y)
print(totalCount)

81


In [176]:
average = totalCount /len(unique_words)
print(totalCount)
print(round(average, 2))

81
8.1


### (5c) ¿Cuántas palabras distintas tienen exactamente tres vocales?

In [181]:
recuento_palabras.count()

182

In [191]:
def count_vocals(word):
    vocals = 'aeiou'
    num_vocals = []
    
    for w in word:
        if w in vocals:
            num_vocals.append(w)
    
    if len(num_vocals) == 3:
        return True
    else:
        return False                 

In [205]:
print(recuento_palabras.filter(lambda x: count_vocals(x[0])).count())
recuento_palabras.filter(lambda x: count_vocals(x[0])).map(lambda w: w[0]).take(15)

53


['tempora',
 'quidem',
 'debitis',
 'eius',
 'earum',
 'vitae',
 'placeat',
 'impedit',
 'libero',
 'autem',
 'corrupti',
 'voluptas',
 'animi',
 'iure',
 'repellat']

In [206]:
len(recuento_palabras.filter(lambda x: count_vocals(x[0])).collect())

53