<a href="https://colab.research.google.com/github/RogerCS17/portfolio-python/blob/main/Big%20Data/Spark/exercises_spark04.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Aspectos Avanzados sobre RDD

## Almacenamiento en caché

El almacenamiento en caché permite que Spark conserve los datos en todos los cálculos y operaciones.

- Se usa persist( ) o cache( ) para almacenarlo en caché
- cache( ) es simplemente un sinónimo de persist(MEMORY_ONLY)

In [1]:
# Se instala PySpark
!pip install pyspark

# Se importa pyspark
from pyspark.sql import SparkSession

# Se crea una sesión
spark = SparkSession.builder.getOrCreate()

# Se crea un SparkContext
sc = spark.sparkContext



In [2]:
# Se crea un RDD
rdd = sc.parallelize([item for item in range(10)])

# Se muestra el resultado
rdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [3]:
# Se importa StorageLevel
from pyspark.storagelevel import StorageLevel

# Se escoge el nivel de persistencia del RDD
rdd.persist(StorageLevel.MEMORY_ONLY)

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289

In [4]:
# Si se quiere cambiar el nivel de persistencia, se debe usar unpersist
rdd.unpersist()

# Ahora se le asigna un nuevo nivel de persistencia
rdd.persist(StorageLevel.DISK_ONLY)

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289

## Particionado

Los RDD operan con datos no como una sola masa de datos, sino que administran y operan los datos en particiones repartidas por todo el clúster
- El número de particiones es importante

Particionadores
- HashPartitioner: Es el particionador predeterminado en Spark.

  Fórmula: partitionIndex = hash(key) % numPartitions
- RangePartitioner: Funciona dividiendo el RDD en rangos aproximadamente iguales


In [5]:
# Se crea el rdd
rdd = sc.parallelize(["x", "y", "z"])

# Se establece el número de particiones
num_partitions = 6

# Se aplica la fórmula
hash("z") % num_partitions

4

## Broadcast Variables
Las variables broadcast son variables compartidas entre todos los ejecutores. Estos se crean una vez en el controlador y luego se leen sólo en los ejecutores

In [6]:
# Se crea el RDD
rdd = sc.parallelize([item for item in range(10)])

# Se muestra el resultado
rdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [7]:
# Se crea el broadcast
number = 1
br_number = sc.broadcast(number)

# Se crea otro rdd
rdd_number = rdd.map(lambda x: x + br_number.value)

# Se muestra el resultado
rdd_number.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

## Acumuladores

Los acumuladores son variables compartidos entre ejecutores que normalmente se utilizan para agregar contadores a su programa Spark.
- sparkContext.accumulator( ): Se usa para definir variables de acumulador.
- add( ): Se usa para agregar/actualizar un valor en el acumulador.
- La propiedad value de la variable del acumulador se utiliza para recuperar el valor del acumulador.

In [8]:
# Se crea el acumulador
acc = sc.accumulator(0)

# Se crea el RDD
rdd = sc.parallelize([2,4,6,8,10])

# Se usa el acumulador
rdd.foreach(lambda x: acc.add(x))

# Se muestra el resultado
acc.value

30

## Ejercicios

1. Cree un RDD importes a partir de los datos adjuntos a esta lección como recurso. Emplee acumuladores para obtener el total de ventas realizadas y el importe total de las ventas.

In [9]:
# Se lee el archivo
imports = sc.textFile("./rdd.txt")

In [10]:
# Se define los acumuladores
total_sales = sc.accumulator(0)
import_total = sc.accumulator(0)

# Se calcula la cantidad de ventas totales
imports.foreach(lambda x: total_sales.add(1))

# Se calcula el import total
imports.foreach(lambda x: import_total.add(float(x)))

# Se muestra lso resultados
print(f"Se vendio la cantidad total de: {total_sales.value} y la venta total fue de: {import_total.value}")

Se vendio la cantidad total de: 10000 y la venta total fue de: 5042335.0


2. Si se conoce que a cada venta hay que restarle un importe fijo igual a 10 pesos por temas de impuestos.

- ¿Cómo restaría este impuesto de cada venta utilizando una variable broadcast para acelerar el proceso?

In [11]:
# Se crea el broadcast
tax = sc.broadcast(10)

- Cree un RDD llamado ventas_sin_impuestos a partir de la propuesta del inciso a que contenga las ventas sin impuestos.

In [12]:
# Se crea el RDD
sales_no_tax = imports.map(lambda x: float(x) - tax.value)

# Se muestra el resultado
sales_no_tax.take(5)

[517.0, 376.0, 691.0, 230.0, 931.0]

- Destruya la variable broadcast creada luego de emplearla para crear el RDD del inciso b.

In [13]:
tax.destroy()

3. Persista el RDD ventas_sin_impuestos en los siguientes niveles de persistencia.

In [14]:
# Se importa StorageLevel
from pyspark.storagelevel import StorageLevel

# A nivel de memoria
sales_no_tax.cache()

# Se quita el nivel de persistencia
sales_no_tax.unpersist()

# A nivel de Disco
sales_no_tax.persist(StorageLevel.DISK_ONLY)

# Se quita el nivel de persistencia
sales_no_tax.unpersist()

# A nivel de memoria y disco
sales_no_tax.persist(StorageLevel.MEMORY_AND_DISK)

PythonRDD[11] at RDD at PythonRDD.scala:53