## Instalación de librerías necesarias

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
!tar xf spark-3.3.0-bin-hadoop3.tgz
!pip install -q findspark

El sistema no puede encontrar la ruta especificada.
"wget" no se reconoce como un comando interno o externo,
programa o archivo por lotes ejecutable.
"tar" no se reconoce como un comando interno o externo,
programa o archivo por lotes ejecutable.

[notice] A new release of pip available: 22.2.2 -> 22.3
[notice] To update, run: python.exe -m pip install --upgrade pip


### Cargamos el entorno

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"

In [3]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

### Importamos pyspark y creamos una sessión

In [4]:
import findspark
findspark.init("spark-3.3.0-bin-hadoop3")# SPARK_HOME
from pyspark.sql import SparkSession
ss = SparkSession.builder.master("local[*]").getOrCreate()

Exception: Unable to find py4j in spark-3.3.0-bin-hadoop3\python, your SPARK_HOME may not be configured correctly

## Nuestros primer conjunto de datos en un cluster
Creamos una lista en local y la subimos al cluster

Los datos se almacenan en una estructura de datos distribuida denominada [RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html)

In [None]:
data = ['apple', 'orange', 'banana', 'grape', 'watermelon', 'apple', 'orange', 'apple']
# Subimos los datos al cluster
distData = ss.sparkContext.parallelize(data)
distData

### Operaciones básicas

In [None]:
# Contar número de elementos/filas
distData.count()

#### Obtener elementos
Obtener el/la primer/a elemento/fila

In [None]:
distData.first()

Obtener x primeros elementos

In [None]:
x = 1
print(distData.take(x))
x = 3
print(distData.take(x))


Obtener x primeros elementos por orden ASCII

In [None]:
distData.takeOrdered(x)


Obtener todos los elementos. 

Cuidado, trae toda la información del conjunto de datos distribuido en el cluster a la máquina local pudiendo producir un desbordamiento de la memoria

In [None]:
distData.collect()

#### Transformaciones básicas
Tenemos a nuestra disposición los métodos de programación funcional típicos

In [None]:
dir(distData)

Utiliza `map` para concatenar la palabra fruit a cada fruta

In [None]:
#TODO
fruits = distData.

Cuenta el número de caracteres de cada cadena

In [None]:
# TODO
distData.

Cuenta el número total de caracteres

In [None]:
#TODO
distData.

Obtener los distintos tipos de fruta existentes

Utilizar `distinct()`

In [None]:
#TODO
distData.

### Pair RDD

Es un tipo de estructura de datos particular que permite trabajar pares de clave-valor donde realizar [operaciones](https://spark.apache.org/docs/latest/rdd-programming-guide.html#working-with-key-value-pairs) que impliquen agrupamiento por clave.

Podemos crear un `RDD` de tipo Pair a partir del conjunto de datos anterior.

In [None]:
pairRDD = distData.map(lambda x: (x, 1))
pairRDD

Obtener los distintos tipos de fruta existentes

Ahora utilizar `groupByKey()` y guardar el resultado en `groupedRDD`

In [None]:
#TODO
groupedRDD = 
groupedRDD.collect()

`groupedRDD` contiene un listado con las distintas claves existentes en el `pairRDD` y un iterador a los valores de cada clave. 

Podemos recopilar todos esos elmentos aplicando la función `list` a cada iterador

In [None]:
#TODO
groupedRDD.mapValues(

Contar número de veces que aparece cada fruta.

Utilizar `reduceByKey()` con `pairRDD`

y 

`mapValues` con `groupedRDD`

¿Cuál es más eficiente?

In [None]:
#TODO
pairRDD.reduceByKey(

In [None]:
#TODO
groupedRDD.mapValues(

Tambien se puede hacer con `countByKey()`
Cuidado para conjuntos de datos grandes cuyo resultado no es posible guardar en local

In [None]:
#TODO
pairRDD.

Cuenta el número de apariciones de cada fruta a partir de `distData`

In [None]:
#TODO

Encuentra aquellas frutas que solo aparecen una vez.

Utilizar `filter`

In [None]:
#TODO

## EJERCICIO
Apliquemos lo aprendido sobre el archivo `shakespeare.txt`

In [None]:
fileRDD = ss.sparkContext.textFile("/content/shakespeare.txt")
fileRDD.takeSample(False, 10)

#### Obtener las 10 palabras que aparecen con más frecuencia y su frecuencia absoluta

Algunas funciones útiles que no has utilizado aún


*   `flatMap`
*   `split`

Ten en cuenta que las palabras pueden tener letras mayusculas que hacen que se identifiquen como palabras diferentes. Utiliza la funcion `lower` the `str` para poner todas las palabras en letras minusculas.


In [None]:
#TODO
diez_mas_frecuentes = fileRDD.

#### Extra: generar histograma con el recuento de las diez palabras más frecuentas.
Utilizar `barplot` de Seaborn. 
Seaborn espera un `DataFrame` de Pandas.

In [None]:
df = pd.DataFrame(diez_mas_frecuentes_sol, columns =['word', 'count'])
df

In [None]:
#TODO
plt = sns.barplot(