## Instalación de librerías necesarias

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.4.2/spark-3.4.2-bin-hadoop3.tgz
!tar xf spark-3.4.2-bin-hadoop3.tgz
!pip install -q findspark

### Cargamos el entorno

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"
import findspark
findspark.init("spark-3.4.2-bin-hadoop3")# SPARK_HOME

In [3]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

### Importamos pyspark y creamos una sessión

In [4]:
import findspark
findspark.init("spark-3.4.2-bin-hadoop3")# SPARK_HOME
from pyspark.sql import SparkSession
ss = SparkSession.builder.master("local[*]").getOrCreate()

## Nuestros primer conjunto de datos en un cluster
Creamos una lista en local y la subimos al cluster

Los datos se almacenan en una estructura de datos distribuida denominada [RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html)

In [5]:
data = ['apple', 'orange', 'banana', 'grape', 'watermelon', 'apple', 'orange', 'apple']
# Subimos los datos al cluster
distData = ss.sparkContext.parallelize(data)
distData

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:287

### Operaciones básicas

In [6]:
# Contar número de elementos/filas
distData.count()

8

#### Obtener elementos
Obtener el/la primer/a elemento/fila

In [7]:
distData.first()

'apple'

Obtener x primeros elementos

In [9]:
x = 1
print(distData.take(x))
x = 3
print(distData.take(x))

['apple']
['apple', 'orange', 'banana']


Obtener x primeros elementos por orden ASCII

In [12]:
distData.takeOrdered(5)

['apple', 'apple', 'apple', 'banana', 'grape']

Obtener todos los elementos.

Cuidado, trae toda la información del conjunto de datos distribuido en el cluster a la máquina local pudiendo producir un desbordamiento de la memoria

In [11]:
distData.collect()

['apple',
 'orange',
 'banana',
 'grape',
 'watermelon',
 'apple',
 'orange',
 'apple']

#### Transformaciones básicas
Tenemos a nuestra disposición los métodos de programación funcional típicos

In [13]:
dir(distData)

['__add__',
 '__annotations__',
 '__class__',
 '__class_getitem__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getnewargs__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__orig_bases__',
 '__parameters__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__slots__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_computeFractionForSampleSize',
 '_defaultReducePartitions',
 '_id',
 '_is_barrier',
 '_is_protocol',
 '_jrdd',
 '_jrdd_deserializer',
 '_memory_limit',
 '_pickled',
 '_reserialize',
 '_to_java_object_rdd',
 'aggregate',
 'aggregateByKey',
 'barrier',
 'cache',
 'cartesian',
 'checkpoint',
 'cleanShuffleDependencies',
 'coalesce',
 'cogroup',
 'collect',
 'collectAsMap',
 'collectWithJobGroup',
 'combineByKey',
 'context',
 'count',
 'countApprox',
 'countApproxDistinct',
 'countByKey',
 'countByVa

Utiliza `map` para concatenar la palabra fruit a cada fruta

In [23]:
#TODO
fruits = distData.map(lambda x: x + ' fruit')

fruits.collect()

['apple fruit',
 'orange fruit',
 'banana fruit',
 'grape fruit',
 'watermelon fruit',
 'apple fruit',
 'orange fruit',
 'apple fruit']

Cuenta el número de caracteres de cada cadena

In [25]:
# TODO
distData.map(len).collect()

[5, 6, 6, 5, 10, 5, 6, 5]

Cuenta el número total de caracteres

In [29]:
#TODO
distData.map(len).reduce(lambda x, y: x + y)

48

Obtener los distintos tipos de fruta existentes

Utilizar `distinct()`

In [30]:
#TODO
distData.distinct().collect()

['orange', 'banana', 'watermelon', 'apple', 'grape']

### Pair RDD

Es un tipo de estructura de datos particular que permite trabajar pares de clave-valor donde realizar [operaciones](https://spark.apache.org/docs/latest/rdd-programming-guide.html#working-with-key-value-pairs) que impliquen agrupamiento por clave.

Podemos crear un `RDD` de tipo Pair a partir del conjunto de datos anterior.

In [31]:
pairRDD = distData.map(lambda x: (x, 1))
pairRDD

PythonRDD[21] at RDD at PythonRDD.scala:53

Obtener los distintos tipos de fruta existentes

Ahora utilizar `groupByKey()` y guardar el resultado en `groupedRDD`

In [32]:
#TODO
groupedRDD = pairRDD.groupByKey()
groupedRDD.collect()

[('orange', <pyspark.resultiterable.ResultIterable at 0x78725f7de8c0>),
 ('banana', <pyspark.resultiterable.ResultIterable at 0x78725f7dc8b0>),
 ('watermelon', <pyspark.resultiterable.ResultIterable at 0x78725f7dd360>),
 ('apple', <pyspark.resultiterable.ResultIterable at 0x78725f7dd930>),
 ('grape', <pyspark.resultiterable.ResultIterable at 0x78725f7df880>)]

`groupedRDD` contiene un listado con las distintas claves existentes en el `pairRDD` y un iterador a los valores de cada clave.

Podemos recopilar todos esos elmentos aplicando la función `list` a cada iterador

In [34]:
#TODO
groupedRDD.mapValues(list).collect()

[('orange', [1, 1]),
 ('banana', [1]),
 ('watermelon', [1]),
 ('apple', [1, 1, 1]),
 ('grape', [1])]

Contar número de veces que aparece cada fruta.

Utilizar `reduceByKey()` con `pairRDD`

y

`mapValues` con `groupedRDD`

¿Cuál es más eficiente?

In [36]:
#TODO
pairRDD.reduceByKey(lambda x, y: x + y).collect()

[('orange', 2), ('banana', 1), ('watermelon', 1), ('apple', 3), ('grape', 1)]

In [39]:
#TODO
groupedRDD.mapValues(len).collect()

[('orange', 2), ('banana', 1), ('watermelon', 1), ('apple', 3), ('grape', 1)]

Tambien se puede hacer con `countByKey()`
Cuidado para conjuntos de datos grandes cuyo resultado no es posible guardar en local

In [40]:
#TODO
pairRDD.countByKey()

defaultdict(int,
            {'apple': 3,
             'orange': 2,
             'banana': 1,
             'grape': 1,
             'watermelon': 1})

Cuenta el número de apariciones de cada fruta a partir de `distData`

In [42]:
#TODO
distData.map(lambda word: (word,1)).reduceByKey(lambda x, y: x + y).collect()

[('orange', 2), ('banana', 1), ('watermelon', 1), ('apple', 3), ('grape', 1)]

Encuentra aquellas frutas que solo aparecen una vez.

Utilizar `filter`

In [58]:
#TODO
distData.map(lambda word: (word,1)).reduceByKey(lambda x, y: x+y).filter(lambda x: x[1]==1).collect()

[('banana', 1), ('watermelon', 1), ('grape', 1)]

## EJERCICIO
Apliquemos lo aprendido sobre el archivo `shakespeare.txt`

In [None]:
import requests

# URL del archivo crudo en GitHub (asegúrate de usar el enlace directo al raw content)
url = 'https://raw.githubusercontent.com/CarlosVU/testspark/master/shakespeare.txt'

# Descarga el archivo
respuesta = requests.get(url)

# Nombre del archivo local donde se guardará el contenido
nombre_archivo_local = '/tmp/shakespeare.txt'

# Asegúrate de que la petición fue exitosa
if respuesta.status_code == 200:
    # Escribe el contenido en un archivo local
    with open(nombre_archivo_local, 'wb') as archivo:
        archivo.write(respuesta.content)
else:
    raise Exception('Error al descargar el archivo. Código de estado HTTP:', respuesta.status_code)

# Ahora que el archivo está en tu sistema de archivos local, usa Spark para leerlo
fileRDD = ss.sparkContext.textFile(nombre_archivo_local)
sample = fileRDD.takeSample(False, 10)

# Imprime el resultado
print(sample)

In [None]:
fileRDD = ss.sparkContext.textFile("/content/shakespeare.txt")
fileRDD.takeSample(False, 10)

#### Obtener las 10 palabras que aparecen con más frecuencia y su frecuencia absoluta

Algunas funciones útiles que no has utilizado aún


*   `flatMap`
*   `split`

Ten en cuenta que las palabras pueden tener letras mayusculas que hacen que se identifiquen como palabras diferentes. Utiliza la funcion `lower` the `str` para poner todas las palabras en letras minusculas.


In [None]:
#TODO
diez_mas_frecuentes = fileRDD.

#### Extra: generar histograma con el recuento de las diez palabras más frecuentas.
Utilizar `barplot` de Seaborn.
Seaborn espera un `DataFrame` de Pandas.

In [None]:
df = pd.DataFrame(diez_mas_frecuentes_sol, columns =['word', 'count'])
df

In [None]:
#TODO
plt = sns.barplot(