<a href="https://colab.research.google.com/github/EduardoPrieto/ELIA/blob/master/Spark_RDDs_vs_Datasets.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Diferencias entre RDDs y Datasets en Apache Spark

En Apache Spark, los Resilient Distributed Datasets (RDDs) y los Datasets son dos abstracciones de datos distribuidas que permiten realizar operaciones paralelas en grandes conjuntos de datos. A continuación, se presentan algunas diferencias clave:

## 1. Tipos de Datos

* ### RDDs (Resilient Distributed Datasets)

Los RDDs son una colección distribuida de objetos inmutables y tolerantes a fallos. Pueden contener cualquier tipo de objeto y se almacenan en particiones distribuidas a través de un clúster Spark. Sin embargo, la información sobre el tipo de datos se pierde cuando se almacenan en un RDD, y las operaciones en RDDs son principalmente de bajo nivel.

* ### Datasets

Los Datasets son una extensión de los RDDs y proporcionan una API más rica y orientada a tipos de datos más fuertes. Están diseñados para trabajar con datos estructurados mediante la introducción de un sistema de tipos de datos más rico que los RDDs. Pueden trabajar con datos de tipo primitivo y también con objetos de tipo usuario, manteniendo la información sobre el tipo de datos.

## 2. Optimización de Consultas

* ### RDDs

Las operaciones en RDDs son evaluadas de manera perezosa (lazy evaluation), lo que significa que las transformaciones no se ejecutan de inmediato, sino que se planifican para su ejecución cuando se dispara una acción. Sin embargo, el motor de Spark no tiene un conocimiento profundo de la estructura de datos dentro de un RDD, lo que puede limitar las oportunidades de optimización.

* ### Datasets

Los Datasets, al ser orientados a tipos, permiten un mejor rendimiento a través de la optimización de consultas y el uso de la reflexión de tipos para aplicar reglas de optimización durante la compilación. Esto puede resultar en un rendimiento más eficiente en comparación con RDDs para operaciones complejas.

## 3. Tolerancia a Fallos

* ### RDDs

Los RDDs son inmutables y tolerantes a fallos por diseño. Cualquier transformación en un RDD crea un nuevo RDD en lugar de modificar el RDD existente. Esto facilita la recuperación ante fallos.

* ### Datasets

Los Datasets también son tolerantes a fallos y ofrecen una tolerancia a fallos similar a los RDDs.



## RDDs

In [None]:
"""
Los RDDs usan la programación orientada a objetos como forma de orquestar los ETL
importante tener una buena comprensión de lambda puesto que es crucial dentro de esta tecnologia
"""
from pyspark import SparkContext

sc = SparkContext("local", "EjemploRDD")

datos_temperatura = [("CiudadA", 25.5), ("CiudadB", 30.0), ("CiudadA", 28.3),
                     ("CiudadB", 32.1), ("CiudadA", 26.8), ("CiudadB", 31.5)]

rdd_temperatura = sc.parallelize(datos_temperatura)

rdd_promedio_por_ciudad = (
    rdd_temperatura
    .map(lambda x: (x[0], (x[1], 1)))
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
    .mapValues(lambda x: x[0] / x[1])
)


resultados = rdd_promedio_por_ciudad.collect()
for ciudad, promedio in resultados:
    print(f"La temperatura promedio en {ciudad} es {promedio} grados Celsius.")

# Importante para el contexto que se creo, puesto que podria haber problemas mas adelante


sc.stop()


## *Datasets*

Este enfoque con DataFrames es más conciso y fácil de leer en comparación con el uso directo de RDDs. Además, los DataFrames ofrecen optimizaciones internas, como la ejecución distribuida y la optimización de consultas, que pueden mejorar el rendimiento en comparación con RDDs para muchas operaciones comunes.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("EjemploDataset").getOrCreate()

## Retomamos el ejemplo anterior
datos_temperatura = [("CiudadA", 25.5), ("CiudadB", 30.0), ("CiudadA", 28.3),
                     ("CiudadB", 32.1), ("CiudadA", 26.8), ("CiudadB", 31.5)]

columnas = ["Ciudad", "Temperatura"]
df_temperatura = spark.createDataFrame(datos_temperatura, columns)

# Realizar operaciones con el DataFrame para calcular el promedio de temperatura por ciudad
# Para esto podemos manejarlo de la misma forma que pandas
df_promedio_por_ciudad = (
    df_temperatura
    .groupBy("Ciudad")
    .agg(F.avg("Temperatura").alias("PromedioTemperatura"))
)


df_promedio_por_ciudad.show()

# Detener la SparkSession
"""IMPORTANTE"""
spark.stop()


### Datasets usando lenguaje SQL

es posible utilizar lenguaje SQL en PySpark con DataFrames. PySpark proporciona una interfaz SQL que te permite ejecutar consultas SQL directamente sobre los DataFrames.
Este enfoque es útil cuando estás familiarizado con SQL y prefieres expresar tus operaciones de manipulación de datos de esa manera. Ten en cuenta que, internamente, Spark optimizará la ejecución de estas consultas SQL para aprovechar al máximo su motor distribuido.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("EjemploSQL").getOrCreate()

datos_temperatura = [("CiudadA", 25.5), ("CiudadB", 30.0), ("CiudadA", 28.3),
                     ("CiudadB", 32.1), ("CiudadA", 26.8), ("CiudadB", 31.5)]

columnas = ["Ciudad", "Temperatura"]
df_temperatura = spark.createDataFrame(datos_temperatura, columnas)

# Registrar el DataFrame como una tabla temporal
df_temperatura.createOrReplaceTempView("tabla_temperatura")

# Ejecutar una consulta SQL sobre el DataFrame
resultado_sql = spark.sql("SELECT Ciudad, AVG(Temperatura) as PromedioTemperatura FROM tabla_temperatura GROUP BY Ciudad")


resultado_sql.show()

# Detener la SparkSession
"""IMPORTANTE"""
spark.stop()
