# RDDs con PySpark

En este notebook aprenderemos cómo crear una sesión de Spark (SparkSession) y cómo crear RDDs a partir de colecciones de Python y de ficheros externos.

En primer lugar, debemos asegurarnos de que Java está instalado. Configuramos la variable `JAVA_HOME`, que indica a Spark dónde encontrar la JVM contra la que ejecutar su código Scala.

In [31]:
import os

# En nuestro ordenador personal, si no esta definida la variable JAVA_HOME, deberemos indicarla
# Para sistemas basados en Debian/Ubuntu, si tenemos instalada la version 17 de Java, seria:
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
# En Windows, la ruta puede ser algo como: "C:\\Program Files\\Java\\jdk-17"
# os.environ["JAVA_HOME"] = "C:\\Program Files\\Java\\jdk-17"
# Si ya esta definida, no es necesario hacer nada
# os.environ["JAVA_HOME"]

# En Google Colab no es necesario hacer nada

A continuación, podemos importar PySpark y comprobar su versión para asegurarnos de que está correctamente instalado.

In [1]:
import pyspark

pyspark.__version__

'4.0.1'

Ya estamos listos para crear una sesión de Spark y empezar a trabajar con RDDs.

In [2]:
from pyspark.sql import SparkSession

# Crear sesión de Spark
spark = SparkSession.builder.appName("Analisis Deportivo con Spark").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/10/15 14:29:59 WARN Utils: Your hostname, maes, resolves to a loopback address: 127.0.1.1; using 10.0.68.169 instead (on interface wlp0s20f3)
25/10/15 14:29:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/15 14:30:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Una vez creado un objeto `SparkSession`, podemos inspeccionarlo. Ahora mismo lo estamos utilizando en modo local, con todos los núcleos de nuestro ordenador -> [*].

In [3]:
spark

### Ejemplo 1

Vamos a crear un RDD a partir de una colección de Python. En este caso, una lista con información sobre jugadores y su distancia recorrida en un partido de fútbol.

In [4]:
# Datos simulados: (jugador, distancia recorrida en km)
datos = [("Jugador1", 10.2), ("Jugador2", 9.8), ("Jugador1", 11.0)]

# Creamos un RDD a partir de la lista
rdd = spark.sparkContext.parallelize(datos)

# Distancia total por jugador
distancia_total = rdd.reduceByKey(lambda a, b: a + b)
distancia_total.collect()

                                                                                

[('Jugador2', 9.8), ('Jugador1', 21.2)]

### Ejemplo 2

Veremos una implementación del clásico problema de contar palabras (word count) utilizando RDDs. En este caso, el RDD se crea a partir de un fichero de texto.

Si estamos trabajando en local con Jupyter Notebook/Lab, accedemos directamente utilizando la carpeta `data/` (deberás comentar las opciones de Google Collab para que no sobrescriba)

In [8]:
ruta_fichero="data/wordcount_data.txt"

Si estamos en Google Collab tenemos 2 opciones:

**Opción 1: Subir un fichero de datos a mano:**
- Abrirmos el directorio actual (:file_folder:) y arrastramos el fichero que queremos utilizar (wordcount_data.txt)
- Cuando cerremos Google Colab, ese fichero se _perderá_
- El fichero estará disponible usando simplemente su nombre

In [56]:
ruta_fichero="wordcount_data.txt"

**Opción 2: Montamos una carpeta de Google Drive que contenga nuestros datos**
- Deberemos tener creada una carpeta "Colab Notebooks" en la raiz de nuestra carpeta de Google Drive (se crea sola al subir el primer Notebook)
- Subimos nuestros datos (por ejemplo, la carpeta `data/` que tenemos en los ejemplos de Spark)
- Para tener las rutas más limpias, creamos un enlace simbólico para que los datos estén disponibles en la carpeta workspace


In [46]:
# Montamos la carpeta (nos pedirá permisos)
from google.colab import drive
drive.mount('/content/drive')
# Crea un atajo llamado 'workspace' en la carpeta /content (dará un pequeño error si ya existe)
!ln -s "/content/drive/MyDrive/Colab Notebooks" "/content/workspace" >/dev/null 2>&1
# Ya podemos acceder a los ficheros, por ejemplo:
ruta_fichero = "/content/workspace/data/wordcount_data.txt"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [9]:
rdd = spark.sparkContext.textFile(ruta_fichero)

In [10]:
rdd.count() # Numero de lineas

44

In [59]:
rdd_word_count =(rdd
    .flatMap(lambda line: line.split())
    .map(lambda word: (word, 1))
    .reduceByKey(lambda x, y: x + y))

In [51]:
rdd_word_count.saveAsTextFile("output_wordcount") # Si ya existe el directorio, da error, podemos ejecutar en la terminal rm -rf output_wordcount/

In [27]:
rdd_word_count.takeOrdered(10, key=lambda x: -x[1])

[('the', 38),
 ('a', 28),
 ('of', 25),
 ('word', 24),
 ('and', 23),
 ('words', 21),
 ('is', 19),
 ('to', 18),
 ('count', 11),
 ('in', 11)]

Finalmente, podemos cerrar la sesión de Spark cuando ya no la necesitemos. Si lo hacemos, habrá que ejecutar todo el notebook de nuevo para volver a crearla.

In [28]:
spark.stop()