<a href="https://colab.research.google.com/github/N34R20/PySpark/blob/main/Introduccio%CC%81n_a_Spark_%26_Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<img src='https://th.bing.com/th/id/R.38bf6a6c11d543d1015e724f002798a4?rik=25ujuy3w4NDXtg&pid=ImgRaw&r=0' width='210'/>

**Apache Spark** es un open-source framework para ejecutar codigo de manera paralela en maquinas diferentes, esto ultimo se conoce como procesamiento distribuido en un cluster. Esta casi en su totalidad escrito en Scala pero podemos usar Spark escribiendo codigo en Python con la libreria PySpark.

# Spark Stack
---

<img src='https://github.com/engcarlosperezmolero/resources_and_tools/blob/main/imgs/pyspark-class/spark-stack.png?raw=true' />

# Conceptos Claves
---

### SparkSession (tambien conocido como ```spark```)
Una clase definida en el paquete de ```pyspark.sql```. Esta clase es el punto de acceso unificado para programar Spark usando la API Estructurada (DataFrame y Dataset). Se usa normalmente para crear Dataframes, registrar Dataframes como tablas, ejecutar SQL sobre tablas y leer archivos parquet. Desde esta clase puedes acceder tambien al SparkContext.

Se le denomina "unificado" porque agrupa los siguientes puntos de acceso a otras funcionalidades de Spark:
- Spark Context
- SQL Context
- Hive Context
- Streaming Context
- Spark Configuration


### SparkContext (tambien conocido como ```sc```)
Una clase definida en el paquete de ```pyspark``` y representa la conexion al Spark Cluster. Es el punto de acceso principal para usar el motor de Spark. Mantiene una conexion con el "cluster manager" de Spark y puede ser usado para crear RDD y hacer broadcasting de variables en ese cluster. Todas las aplicaciones de Spark (incluyendo PySpark Shell y programas de Python aislados) se ejecutan como un conjunto de procesos independientes. Estos procesos son coordinados por el SparkContext  en un programa driver.


### Driver (archivo .py o notebook .ipynb)
Para enviar (submit) un programa de Python a Spark, necesitars escribir un programa Driver con algun Lenguaje que proporcione la conexion a spark (como PySpark). Este programa esta a cargo de crear el SparkContext, RDD y Dataframes.


### Worker (Recursos = Procesadores | Memoria RAM)
En el entorno del cluster de Spark, existen dos tipos de nodos:
- 1 Master (o 2 para hablar de alta disponibilidad, a veces mas)
- Conjunto de Workers.

Worker es cualquier nodo capaz de ejecutar programas en el cluster. Si un proceso es disparado para cierta aplicacion, entonces esta aplicacion adquiere executors y workers, los cuales llevan a cabo la ejecucion de las tareas de la aplicacion de Spark.

Tambien se conocen como nodo Slave (Esclavo).


### Executor (procesos dentro de los Workers)
Son los procesos que existen dentro de los Workers donde se llevan a cabo las tareas designadas. Cada worker genera subprocesos de python donde se envia el codigo del usuario y los datos para ser procesados.




### Cluster Manager
El nodo master tambien es conocido como el cluster manager. Este se encargara administrar el cluster de servidores necesario para que Spark lleve a cabo las tareas. El cluster manager asigna recursos a cada aplicacion dentro del programa driver. Hay 5 tipos de Cluster Manager aceptados por Spark actualmente:
- Standalone: entorno de cluster built-in de Spark.
- Mesos: un kernel de Sistemas Distribuidos.
- Hadoop YARN (Yet Another Resource Negotiator).
- Kubernetes
- Amazon EC2

<img src='https://github.com/engcarlosperezmolero/resources_and_tools/blob/main/imgs/pyspark-class/spark-app-arch.png?raw=true' />

### Particiones
Para permitir a cada executor realizar trabajos en paralelo, Spark divide los datos en chunks (division logica de la data) llamados particiones. Una particion es la unidad basica de paralelismo en Spark que esta en una maquina fisica (nodo) de tu cluster. Con DataFrames no manipularas particiones manualmente o individualmente, solo especificaras transformaciones de alto nivel. Spark tomara tus instrucciones y se encargara de hacer el trabajo de bajo nivel (usando la RDD API).

# RDD: Resilient Distributed Dataset
---

Es literalmente una coleccion de particiones que a su vez es una coleccion de objetos, cualquier clase de objeto puede estar presente, es muy flexible en comparación al Dataframe (el cual es una coleccion de columnas) pero tambien esta caracteristica lo hace mas dificil de operar. Spark tiene 3 tipos de abstracciones de datos RDD es solo 1 de las 3 que existen (siendo las otras dos Dataframes y Datasets).


- Se denota como RDD[T], cada elemento tiene tipo T.


In [None]:
!pip install pyspark -q

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

# .config("spark.sql.repl.eagerEval.enabled", "True")\

sc = spark.sparkContext

In [None]:
collection = [1, "two", 3.0, ("four", 4), {"five": 5}] # entero, texto, tupla, diccionario
collection_rdd = sc.parallelize(collection) # este es el comando que se usa para crear el RDD a partir de una lista

In [None]:
type(collection_rdd)

pyspark.rdd.RDD

In [None]:
collection_rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:287

In [None]:
collection_rdd.collect()

[1, 'two', 3.0, ('four', 4), {'five': 5}]

In [None]:
[i for i in dir(collection_rdd) if not i.startswith("_")]

['aggregate',
 'aggregateByKey',
 'barrier',
 'cache',
 'cartesian',
 'checkpoint',
 'cleanShuffleDependencies',
 'coalesce',
 'cogroup',
 'collect',
 'collectAsMap',
 'collectWithJobGroup',
 'combineByKey',
 'context',
 'count',
 'countApprox',
 'countApproxDistinct',
 'countByKey',
 'countByValue',
 'ctx',
 'distinct',
 'filter',
 'first',
 'flatMap',
 'flatMapValues',
 'fold',
 'foldByKey',
 'foreach',
 'foreachPartition',
 'fullOuterJoin',
 'getCheckpointFile',
 'getNumPartitions',
 'getResourceProfile',
 'getStorageLevel',
 'glom',
 'groupBy',
 'groupByKey',
 'groupWith',
 'has_resource_profile',
 'histogram',
 'id',
 'intersection',
 'isCheckpointed',
 'isEmpty',
 'isLocallyCheckpointed',
 'is_cached',
 'is_checkpointed',
 'join',
 'keyBy',
 'keys',
 'leftOuterJoin',
 'localCheckpoint',
 'lookup',
 'map',
 'mapPartitions',
 'mapPartitionsWithIndex',
 'mapPartitionsWithSplit',
 'mapValues',
 'max',
 'mean',
 'meanApprox',
 'min',
 'name',
 'partitionBy',
 'partitioner',
 'persist'

# Transformaciones y Acciones
---

## Transformaciones (Lazy Evaluated)
Ya que en spark las estructuras de datos fundamentales son inmutables, la unica manera de modificar un estructura (RDD o Dataframe) es a traves de una transformacion, esta es una operacion que crea un nuevo RDD o Dataframe.


- Entonces transforma un RDD de entrada a un RDD (o varios) de salida.
- Una transformacion es basicamente una funcion.
- Si un RDD falla durante una transformacion, el linaje de los datos (data lineage descrito por DAG) reconstruye el RDD.


In [None]:
list_1 = [1, 2, 3, 4]
rdd_1 = sc.parallelize(list_1) # creando el RDD[int]

In [None]:
rdd_2 = rdd_1.map(lambda x: x * 10) # al hacer una transformacion creamos el rdd_2 a partir de rdd_1

### Narrow Dependencies Transformations (1 a 1)
Son aquellas en donde cada particion de entrada despues de transformada correspondera a solo una particion de salida. El ejemplo de arriba (map) es una Narrow Transformation.

<img src='https://github.com/engcarlosperezmolero/resources_and_tools/blob/main/imgs/pyspark-class/narrow_transformation.png?raw=true' />



### Wide Dependencies Transformations (Shuffle - 1 a N)
Son aquellas en donde al menos una de las particiones de entrada despues de transformada correspondera a varias particiones de salida. Tambien se llama Shuffle porque Spark intercambiara particiones a traves de todo el cluster. Mientras que con las Narrow Transformations Spark automaticamente hara una operacion llamada Pipelining (en donde todo sucede en memoria), para los Shuffles Spark escribira los resultados en disco.

<img src='https://github.com/engcarlosperezmolero/resources_and_tools/blob/main/imgs/pyspark-class/wide_transformation.png?raw=true' />

### Lazy Evaluation
Esto es simplemente que Spark esperar para ejecutar una serie de Transformaciones (representadas en forma de DAG) hasta que sea realmente necesario (dado que se realizo una Acción). Gracias a esto Spark es capaz de optimizar el flujo de la data de principio a fin, sin tener nosotros que preocuparnos por eso.
- Leer un archivo, transformarlo y luego filtrarlo, no sera mas eficiente que filtrarlo y luego transformarlo.

### ¿Por qué importa saber la diferencia entre ambas transformaciones?

In [None]:
apariciones_desordenadas = [("a", 1), ("b", 3), ("a", 2), ("b", 5), ("c", 2), ("a", 2), ("c", 3)]
rdd_apariciones = sc.parallelize(apariciones_desordenadas) # RDD[(string, int)]

In [None]:
rdd_apariciones.saveAsTextFile('./apariciones')

In [None]:
# opcion 1 (no recomendada ya que causa mas suffling)
rdd_apariciones.groupByKey().mapValues(lambda values: sum(values)).collect()

[('a', 5), ('b', 8), ('c', 5)]

In [None]:
# opcion 2 (casi siempre el reduceByKey sera mas eficiente que el patron de arriba)
rdd_apariciones.reduceByKey(lambda x, y: x + y).collect()

[('a', 5), ('b', 8), ('c', 5)]

## Acciones
Estas son operaciones o funciones de un RDD que producen valores distintos de un RDD o Dataframe. Son muy importantes ya que estas disparan la ejecucion de los ya construidos lazy RDDs a traves de transformaciones.

Las acciones pueden convertir un RDD en cosas tangibles como:
- un archivo guardado.
- un entero.
- un conteo de elementos.
- una lista o un diccionario.

In [None]:
rdd_2.count() # produce un entero

4

In [None]:
rdd_2.collect() # produce una lista de enteros (NO USAR EN RDD GRANDES en servidores de produccion), usar take en su lugar

[10, 20, 30, 40]

In [None]:
rdd_2.take(2) # DataFrame.take(N) devuelve una lista de las primeras N filas como una lista de objetos Row

[10, 20]

In [None]:
rdd_2.getNumPartitions() # un entero

1

In [None]:
rdd_2.saveAsTextFile("./rdd_2",) # produce archivos guardados en este caso 2

In [None]:
rdd_2.coalesce(1).saveAsTextFile('./rdd_2_coalesce') # produce un archivo

# Persistencia (cache y persist)

Es muy util persistir la data cuando se necesita acceder frecuentemente a la misma data, como por ejemplo cuando se esta ejecutando un algoritmo iterativo o cuando quieres realizar distintas acciones sobre un mismo linaje de transformaciones. Esto permite crear un proceso mas eficiente en cuanto a tiempo y costo de recursos.

Hay dos maneras de persistir RDDs en Spark:
1. ```cache()```
2. ```persist()```

Existen distintos tipos de almacenamiento para persist (que se importan de ```pyspark.StorageLevel```)


Asi mismo si se necesita vaciar el cache de un nodo manualmente entonces se usa el metodo ```unpersist()```

In [None]:
import pyspark

In [None]:
[i for i in dir(pyspark.StorageLevel) if not i.startswith("_")]

['DISK_ONLY',
 'DISK_ONLY_2',
 'DISK_ONLY_3',
 'MEMORY_AND_DISK',
 'MEMORY_AND_DISK_2',
 'MEMORY_AND_DISK_DESER',
 'MEMORY_ONLY',
 'MEMORY_ONLY_2',
 'NONE',
 'OFF_HEAP']

In [None]:
rdd_1 = sc.parallelize(["hola", "soy", "charly"])

In [None]:
import time

def crear_delay(x):
    time.sleep(5)
    return (x,1) # ("hola", 1)

rdd_2 = rdd_1.map(lambda x: crear_delay(x))

In [None]:
# este resultado tarda 15 segundos aprox porque solo tenemos un nodo (no hay realmente una mejora porque no se puede realizar de manera paralela)
%%time
rdd_2.collect()

CPU times: user 86.5 ms, sys: 15.3 ms, total: 102 ms
Wall time: 15.4 s


[('hola', 1), ('soy', 1), ('charly', 1)]

In [None]:
rdd_2.cache()

PythonRDD[27] at collect at <timed eval>:1

In [None]:
# ya que dijimos que sea cacheada entonces el resultado de esta celda sera cacheado, para las mismas transformaciones que crean rdd_2
%%time
rdd_2.collect()

CPU times: user 69.8 ms, sys: 12.7 ms, total: 82.5 ms
Wall time: 15.3 s


[('hola', 1), ('soy', 1), ('charly', 1)]

In [None]:
# ahora este resultado sera inmediato porque no esta realizando las transformaciones, simplemente agarra rdd_2 directamente de la memoria
%%time
rdd_2.collect()

CPU times: user 3.35 ms, sys: 720 µs, total: 4.07 ms
Wall time: 64.7 ms


[('hola', 1), ('soy', 1), ('charly', 1)]

In [None]:
rdd_2.is_cached

True

In [None]:
# es importante realizar la limpieza de la memoria para no recibir luego un error de OutOfMemory
rdd_2.unpersist()

PythonRDD[27] at collect at <timed eval>:1

In [None]:
# ya que el unpersist() lo saca de la memoria entonces ahora vuelve a ejecutar todas las transformaciones
%%time
rdd_2.collect()

CPU times: user 81.1 ms, sys: 8.62 ms, total: 89.7 ms
Wall time: 15.3 s


[('hola', 1), ('soy', 1), ('charly', 1)]

In [None]:
# realizando la persistencia en otro nivel
rdd_2.persist(pyspark.StorageLevel.DISK_ONLY)

PythonRDD[27] at collect at <timed eval>:1

In [None]:
%%time
rdd_2.collect()

CPU times: user 72 ms, sys: 13.8 ms, total: 85.8 ms
Wall time: 15.3 s


[('hola', 1), ('soy', 1), ('charly', 1)]

In [None]:
%%time
rdd_2.collect()

CPU times: user 6.23 ms, sys: 851 µs, total: 7.09 ms
Wall time: 64.1 ms


[('hola', 1), ('soy', 1), ('charly', 1)]

In [None]:
print(rdd_2.getStorageLevel())
rdd_2.unpersist()
print(rdd_2.getStorageLevel())

Disk Serialized 1x Replicated
Serialized 1x Replicated


In [None]:
print(rdd_2.is_cached)

False


In [None]:
rdd_2.cache()

PythonRDD[27] at collect at <timed eval>:1

In [None]:
print(rdd_2.getStorageLevel())
rdd_2.unpersist()
print(rdd_2.getStorageLevel())

Memory Serialized 1x Replicated
Serialized 1x Replicated


# Variables tipo Broadcast
Permiten usar un variable (read only) cacheada en cada nodo en lugar de enviar una copia de la variable en cada tarea. Pueden ser usada por ejemplo para darle una copia a cada nodo de un dataset grande que sea usado como input.

In [None]:
bc_var = sc.broadcast([1, 2, 3, 4, 5])

In [None]:
[i for i in dir(bc_var) if not i.startswith("_")]

['destroy', 'dump', 'load', 'load_from_path', 'unpersist', 'value']

In [None]:
bc_var

<pyspark.broadcast.Broadcast at 0x7d86b5e7a170>

In [None]:
bc_var.value

[1, 2, 3, 4, 5]

In [None]:
bc_var.unpersist() # esto borrara las copias de los cache de cada executor

In [None]:
lista = [1,2,3,4,5,6,7,8,9]
suma = 0
for num in lista:
    suma += num
print(suma)

45


In [None]:
# acumuladores...
ac_var = sc.accumulator(0)

In [None]:
sc.parallelize([1,2,3,4,5,6,7,8,9]).foreach(lambda x: ac_var.add(x))

In [None]:
ac_var.value

45

# Spark UI
---


Desde aqui podras monitorear el progreso de las tareas ejecutadas por spark, asi como visualizar los planes realizados por el master. Se puede visualizar en el nodo 4040 (por defecto) del nodo master. En nuestro caso lo mostraremos en el 4050. La interfaz es realmente util para hacer tunning y debugging.

Normalmente si estuvieses en tu computadora podrias ver la interfaz simplemente al escribir:
http://localhost:4040/


En este caso ya que estamos en una instancia de Colab necesitamos usar Ngrok para enviar lo que sale del puerto 4050 a una url publica temporal.

## Usando Ngrok para visibilizar el SparkUI desde colab

In [None]:
import requests
with open("./ngrok_tunnel.py", "wb") as f:
    f.write(requests.get("https://raw.githubusercontent.com/engcarlosperezmolero/resources_and_tools/main/tools/ngrok_tunnel.py").content)

In [None]:
from ngrok_tunnel import NgrokTunnel

In [None]:
ngrok = NgrokTunnel('2SnEdNgWGVppdDd5wUjVjYKCPgN_24kR77UZQ2yhgQmbhh5x4', 'linux') # se recomienda cambiar la TOKEN de NGROK api (crear cuenta)
ngrok.download_and_unzip('https://bin.equinox.io/c/bNyj1mQVY4c/ngrok-v3-stable-linux-amd64.tgz')
ngrok.run_ngrok(4050)

Download complete and extraction complete!


In [None]:
ngrok.get_public_url()

Public url: 

ConnectionError: ignored

# Leyendo un archivo usando un RDD
---

In [None]:
%%writefile test.txt
Hola que tal soy charly de humai
Que tal charly soy Spark
Pero a mi me gusta Python
Genial puedes usar pyspark para comunicarte con Spark
Usando solo Python
Asi es usando solo Python
Genial

Writing test.txt


## Usando SparkSession directamente

In [None]:
lines_rdd_session = spark.read.text('test.txt').rdd.map(lambda r: r[0]) # RDD[string]

In [None]:
lines_rdd_session.collect()

['Hola que tal soy charly de humai',
 'Que tal charly soy Spark',
 'Pero a mi me gusta Python',
 'Genial puedes usar pyspark para comunicarte con Spark',
 'Usando solo Python',
 'Asi es usando solo Python',
 'Genial']

## Usando SparkContext

In [None]:
lines_rdd_context = sc.textFile('test.txt')

In [None]:
lines_rdd_context.collect()

['Hola que tal soy charly de humai',
 'Que tal charly soy Spark',
 'Pero a mi me gusta Python',
 'Genial puedes usar pyspark para comunicarte con Spark',
 'Usando solo Python',
 'Asi es usando solo Python',
 'Genial']

## Realizando algunas transformaciones

In [None]:
words = lines_rdd_context.flatMap(lambda s: s.split(' ')) # 1 a varios, primero aplica la funcion a cada elemento y luego "aplana" el resultado

In [None]:
words_tuple = words.map(lambda word: (word, 1))

In [None]:
words_count = words_tuple.reduceByKey(lambda x, y: x + y)

In [None]:
words_count.collectAsMap()

{'Hola': 1,
 'que': 1,
 'tal': 2,
 'soy': 2,
 'charly': 2,
 'de': 1,
 'humai': 1,
 'Que': 1,
 'Spark': 2,
 'Pero': 1,
 'a': 1,
 'mi': 1,
 'me': 1,
 'gusta': 1,
 'Python': 3,
 'Genial': 2,
 'puedes': 1,
 'usar': 1,
 'pyspark': 1,
 'para': 1,
 'comunicarte': 1,
 'con': 1,
 'Usando': 1,
 'solo': 2,
 'Asi': 1,
 'es': 1,
 'usando': 1}

# Si la instalación normal llega a fallar
---

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# si falla el wget buscar en la proxima pagina cual es el link que esta funcionando
# https://spark.apache.org/downloads.html
!wget -q https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
!tar xf spark-3.3.0-bin-hadoop3.tgz # cambiar el nombren segun el nombre correcto del archivo

tar: spark-3.3.0-bin-hadoop3.tgz: Cannot open: No such file or directory
tar: Error is not recoverable: exiting now


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3" # cambiar el nombre segun el nombre correcto del archivo

In [None]:
!pip install -q findspark

In [None]:
import findspark # https://github.com/minrk/findspark
findspark.init()

In [None]:
import pyspark