In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=0196b7ad75b34f70cac8a64df0a2bd6f44a90454dccad9a591c2618c0febfd4a
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


# Usando Spark en Python

En este notebook vamos a aprender a desarrollar en Spark con Python, para eso vamos a usar la librería `pyspark`. Al ejecutar la celda anterior, estamos instalando `pyspark` en Google Colab. Ahora estamos listos para dar nuestros primeros pasos con esta librería.

**Importante**: recuerda que comunmente usamos `pyspark` junto con _dataframes_ o con SQL, pero la idea de este notebook es entender las operaciones de más bajo nivel que nos ofrece Apache Spark.

Vamos a partir cargando la librería e inicializando la variable `sc`, que es el contexto de Spark. Cuando estamos corriendo Spark en un cluster, este contexto apunta al cluster de computadores, y finalmente el código que se corre en un entorno local (single node) es igual al que se corre en un entorno distribuido.

In [23]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .getOrCreate()
    
sc = spark.sparkContext
sc

## Primera parte: el hello world de la computación distribuida

Lo primero que vamos a hacer es ejecutar el _hello world_ en computación distribuida, que es contar el número de palabras por palabra de un archivo. Para esto, cargamos el archivo `demo_file_spark.txt` que está junto a este _notebook_.

In [24]:
text_file = sc.textFile("demo_file_spark.txt")
text_file

demo_file_spark.txt MapPartitionsRDD[27] at textFile at NativeMethodAccessorImpl.java:0

La variable `text_file` es de tipo `RDD`: Resilient Distributed Dataset. Este es un archivo que está potencialmente distribuido en varios nodos (por ejemplo, en un HDFS). No vemos nada al cargar la variable `demo_file_spark.txt` porque está distribuido, así que para cargarlo en un nodo e imprimirlo debemos usar la función `collect()`.

In [25]:
text_file.collect()

['Deer Bear River', 'Car Car River', 'Deer Car Bear']

Como vemos, cada línea del archivo es parte de una lista. Al trabajar en un cluster, si no hubiesemos ejecutado la función `collect()`, cada elemento de la lista podría estar en cualquiera de los nodos del cluster. 

Ahora vamos a ejecutar nuestro primer `map()`. Esta función tomará cada línea y hará un `split()` por espacio.

In [26]:
text_file.map(lambda line: line.split(" ")).collect()

[['Deer', 'Bear', 'River'], ['Car', 'Car', 'River'], ['Deer', 'Car', 'Bear']]

Como vemos, es lo que esperamos de una función `map()`. Notemos que si no hacemos `collect()` no podríamos ver los resultados, porque el `map()` se ejecuta de forma distribuida. 

Ahora vamos a hacer un `flatMap()`, que "aplana" los resultados de la función `map()`. Esto es, si la función `map()` genera una colección de elementos de largo mayor que 1, entonces los elementos "salen" de la colección.

In [27]:
text_file.flatMap(lambda line: line.split(" ")).collect()

['Deer', 'Bear', 'River', 'Car', 'Car', 'River', 'Deer', 'Car', 'Bear']

Como vemos, en vez de tener una lista de listas, nos quedamos con una lista de palabras. Ahora, para continuar con el conteo de la aparición de cada palabra podemos hacer que cada palabra se emita junto al número 1, para después sumar cuantos 1s hay por palabra.

In [28]:
text_file.flatMap(lambda line: line.split(" ")) \
  .map(lambda word: (word, 1)).collect()

[('Deer', 1),
 ('Bear', 1),
 ('River', 1),
 ('Car', 1),
 ('Car', 1),
 ('River', 1),
 ('Deer', 1),
 ('Car', 1),
 ('Bear', 1)]

Y el resultado anterior lo podemos reducir para cada una de sus llaves.

In [29]:
counts = text_file.flatMap(lambda line: line.split(" ")) \
  .map(lambda word: (word, 1)) \
  .reduceByKey(lambda a, b: a + b)
  
counts.collect()

[('Bear', 2), ('Deer', 2), ('River', 2), ('Car', 3)]

En este caso, para cada par emitido por la instrucción `.map(lambda word: (word, 1))`, lo que hacemos es considerar como llave el primer elemento de la tupla, y como valor el segundo. Así, al aplicar `reduceByKey`, para cada llave, armamos una colección con todos los elementos que le corresponden a esa llave (que serían solamente 1s). En el caso del archivo anterior sería algo así:

```Bash
"Deer": [1, 1]
"Bear": [1, 1]
"River" [1, 1]
"Car": [1, 1, 1]
```

Luego, aplicamos la función `lambda a, b: a + b` a cada una de las listas, por cada llave.

## Leyendo un archivo con logs

Un caso de uso típico para usar Apache Spark es la lectura de archivos de logs extensos. En caso de tener registros amplios, es natural montar estos archivos en un sistema de archivos distribuido para analizarlos. Ahora vamos a ver un ejemplo cargando el archivo `logs.txt` que encontrarás junto a este _notebook_.

In [30]:
text_file = sc.textFile("logs.txt")

# Toma una muestra aleatoria de 10 elementos sin reemplazo
text_file.takeSample(False, 10)

['Operación 151 ejecutada en la base de datos',
 'Operación 543 ejecutada en la aplicación web',
 'Operación 946 ejecutada en la base de datos',
 'Operación 274 ejecutada en la aplicación web',
 'Operación 815 ejecutada en la aplicación web',
 'Operación 434 ejecutada en la base de datos',
 'Operación 466 ejecutada en la aplicación web',
 'Operación 897 ejecutada en la base de datos',
 'Operación 334 ejecutada en la aplicación web',
 'Error de la aplicación web de tipo 500 en la operación 70']

Para no ver el archivo entero, estamos obteniendo un sample aleatorio sin reemplazo de 10 elementos. Ahora que entendemos cómo se ve el archivo, intentemos encontrar todos los registros que hablan de errores. Para esto vamos a usar la función `filter()`, que retorna un `RDD` nuevo que solo contiene los elementos que cumplen con cierta condición.

In [31]:
logs_filter = text_file.filter(lambda line: "Error" in line)
logs_filter.collect()

['Error de la base de datos en la operación 10',
 'Error de la aplicación web de tipo 500 en la operación 14',
 'Error de la base de datos en la operación 15',
 'Error de la aplicación web de tipo 500 en la operación 17',
 'Error de la aplicación web de tipo 400 en la operación 27',
 'Error de la aplicación web de tipo 500 en la operación 28',
 'Error de la base de datos en la operación 31',
 'Error de la aplicación web de tipo 400 en la operación 35',
 'Error de la base de datos en la operación 37',
 'Error de la aplicación web de tipo 400 en la operación 39',
 'Error de la aplicación web de tipo 500 en la operación 40',
 'Error de la base de datos en la operación 41',
 'Error de la base de datos en la operación 42',
 'Error de la aplicación web de tipo 500 en la operación 43',
 'Error de la base de datos en la operación 45',
 'Error de la base de datos en la operación 46',
 'Error de la base de datos en la operación 49',
 'Error de la base de datos en la operación 50',
 'Error de la 

Ahora puedes probar tú haciendo otra clase de filtros sobre el archivo.

## Cargando un archivo como DataFrame

Si bien no es el objetivo de este _notebook_, vamos a mostrar cómo cargar el archivo anterior en un `DataFrame` para conseguir el mismo filtro.

In [33]:
from pyspark.sql import Row
from pyspark.sql.functions import col

textFile = sc.textFile("logs.txt")

# Creamos un DataFrame con una única columna llamada line
df = textFile.map(lambda r: Row(r)).toDF(["line"])

# Filtramos por las que contengan el texto Error
errors = df.filter(col("line").like("%Error%"))

# Contamos el número de errores
errors.count()

218

Podemos hacer operaciones más complejas y cargar DataFrames de más columnas, pero es importante recordar que la implementación de Spark de un DataFrame es un subconjunto de la que encontramos en Pandas, porque no todas las funciones pueden ser implementadas en el contexto de computación distribuida.

## Joins en pyspark

Ahora probemos cómo hacer joins en `pyspark`, para esto vamos a partir con un ejemplo muy sencillo.

In [38]:
# Ejemplo de tabla 1
t1 = sc.parallelize([("a", 1), ("b", 4)])

# Ejemplo de tabla 2
t2 = sc.parallelize([("a", 2), ("a", 3)])

Estamos instanciando dos listas de tuplas. Esto simula dos tablas.

In [39]:
t1.collect()

[('a', 1), ('b', 4)]

In [40]:
t2.collect()

[('a', 2), ('a', 3)]

Recordemos que en Spark, cuando tenemos una tupla, el primer elemento actua de llave y el segundo de valor. Entonces al usar el comando `join()`, vamos a hacer un join por las llaves, juntando los valores correspondientes.

In [41]:
t1.join(t2).collect()

[('a', (1, 2)), ('a', (1, 3))]

Como vemos, es el resultado que esperamos. Si queremos simular tablas de más atributos, podemos tener un "value" que representa una tupla, para guardar más elementos.

In [42]:
t1 = sc.parallelize([("a", (1, 3)), ("b", (4, 5))])
t2 = sc.parallelize([("a", (2, 6)), ("a", (3, 7))])

t1.join(t2).collect()

[('a', ((1, 3), (2, 6))), ('a', ((1, 3), (3, 7)))]

Ahora vamos a ver cómo cargar líneas de un archivo que representan tablas en `pyspark`. Para esto vamos a cargar el archivo `example_join.txt` que está junto a este _notebook_ y se ve de la siguiente forma:

```
T1,a,1
T1,b,4
T2,a,2
T2,a,3
```

En donde señalamos qué línea pertenece a qué tabla. Para cargar las dos tablas en `pyspark` podemos hacer lo siguiente.

In [44]:
text_file = sc.textFile("example_join.txt")
text_file.collect()

['T1,a,1', 'T1,b,4', 'T2,a,2', 'T2,a,3']

Hacemos funciones para filtrar los elementos de cada tabla.

In [45]:
def get_table_t1(line):
    return line.split(",")[0] == "T1"

def get_table_t2(line):
    return line.split(",")[0] == "T2"

text_file.filter(get_table_t1).collect()

['T1,a,1', 'T1,b,4']

In [46]:
table_t1_raw = text_file.filter(get_table_t1)
table_t2_raw = text_file.filter(get_table_t2)

Ahora pasamos las tuplas de cada tabla al formato que sabemos que necesita `pyspark`.

In [47]:
def convert_to_tuples(line):
    row = line.split(",")
    return (row[1], int(row[2]))

table_t1_raw.map(convert_to_tuples).collect()

[('a', 1), ('b', 4)]

In [48]:
table_t1 = table_t1_raw.map(convert_to_tuples)
table_t2 = table_t2_raw.map(convert_to_tuples)

Y hacemos el join.

In [49]:
table_t1.join(table_t2).collect()

[('a', (1, 2)), ('a', (1, 3))]

## Guardando los resultados

Recordemos que podemos guardar resultados como archivos de texto. Esto es especialmente útil si el resultado es grande y vamos a necesitar guardarlo en un sistema de archivos distribuido.

In [50]:
table_t1.join(table_t2).saveAsTextFile("join_result_folder_v2")

El resultado es una carpeta con cada una de las particiones, que debemos juntar para obtener el resultado completo.

## Broadcasting

Cuando tenemos un set de datos pequeño que necesitamos que haga join con una tabla grande, es preferible enviar la tabla pequeña a cada uno de los nodos del cluster. Este proceso se llama broadcasting. Aquí tenemos un ejemplo.

In [51]:
t1 = {
        1 : 'Red',
        2 : 'Blue',
        3 : 'Green',
        4 : 'Yellow',
    }
t1_broadcast = sc.broadcast(t1)

data = [
    [1, 1.23],
    [2, 2.34],
    [3, 3.45],
    [4, 4.23],
    [1, 32.2],
    [2, 22.2],
    [4, 222.3]
]

t2 = sc.parallelize(data)

t2.map(lambda x: [t1_broadcast.value[x[0]], x[1]]).collect()

[['Red', 1.23],
 ['Blue', 2.34],
 ['Green', 3.45],
 ['Yellow', 4.23],
 ['Red', 32.2],
 ['Blue', 22.2],
 ['Yellow', 222.3]]