# Funciones Avanzadas en Spark

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast
from pyspark.sql.types import *

spark = SparkSession.builder.getOrCreate()

emp = [(1, "AAA", "dept1", 1000),
    (2, "BBB", "dept1", 1100),
    (3, "CCC", "dept1", 3000),
    (4, "DDD", "dept1", 1500),
    (5, "EEE", "dept2", 8000),
    (6, "FFF", "dept2", 7200),
    (7, "GGG", "dept3", 7100),
    (None, None, None, 7500),
    (9, "III", None, 4500),
    (10, None, "dept5", 2500)]

dept = [("dept1", "Department - 1"),
        ("dept2", "Department - 2"),
        ("dept3", "Department - 3"),
        ("dept4", "Department - 4")
       ]

df = spark.createDataFrame(emp, ["id", "name", "dept", "salary"])
deptdf = spark.createDataFrame(dept, ["id", "name"]) 

# Create Temp Tables
df.createOrReplaceTempView("empdf")
deptdf.createOrReplaceTempView("deptdf")

# Save as HIVE tables.
df.write.saveAsTable("hive_empdf", mode = "overwrite")
deptdf.write.saveAsTable("hive_deptdf", mode = "overwrite")

25/01/05 05:12:03 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
25/01/05 05:12:03 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
25/01/05 05:12:04 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
25/01/05 05:12:04 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore frandepy@127.0.1.1
25/01/05 05:12:05 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
25/01/05 05:12:06 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/01/05 05:12:06 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/01/05 05:12:06 WARN MemoryManager: Total allocation exceeds 95.00% (1,

# Broadcast JOIN: 
El objetivo de este tipo de union es mejorar el rendimiento de los dataframes, es una optimizacion para realizar un join de manera mas eficiente, cuando una de las tablas es mas pequenha que las otras.

## Funcionamiento:
Un broadcast join difunde (broadcast) la tabla mas pequena a todos los nodos del cluster, Esto significa que cada nodo recibe una copia de la tabla mas pequena lo que permite realizar el join localmente en cada nodo sin necesidad de cambiar los datos entre ellos.

## Ventajas
- Reduccion del trafico de red.
- Mayor velocidad
- Optimizacion Automatica.

## Limitaciones:
- La tabla pequena debe caber en la memoria de cada nodo, ya que este se replica completamente, si es muy grande puede causar errores de memoria.
- Carga en la memoria: Requiere suficiente espacio de memoria de cada nodo para almacenar la tabla pequena.

## Casos de uso
- Unir una tabla pequena (ej: Lista de parametros) con una tabla grande.

  



## Codigo para determinar el tamano predeterminado de la tabla broadcast en Apache Spark

In [2]:
size_str = spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
size = int(''.join([char for char in size_str if char.isdigit()])) / (1024 * 1024)
print("Default size of broadcast table is {0} MB.".format(size))

Default size of broadcast table is 10.0 MB.


## Se puede cambiar el umbral o tamano por defecto con el siguiente comando:

In [3]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024)

## Ejemplo
Supongamos que queremos unir 2 datafremes, uno `small_df` que puede caber en la memoria y es mas pequeno que el umbral especificado y `big_df` que es un dataframe que debe unirse con el pequeno.

In [4]:
transactions_data = [
    (1, "2025-01-01", 100.0, "USA"),
    (2, "2025-01-02", 200.0, "CAN"),
    (3, "2025-01-03", 300.0, "MEX"),
    (4, "2025-01-04", 400.0, "USA"),
    (5, "2025-01-05", 500.0, "CAN"),
]

transactions_schema = ["id", "date", "amount", "country"]

transactions_df = spark.createDataFrame(transactions_data, schema=transactions_schema)

# Crear una tabla pequeña de países
countries_data = [
    ("USA", "United States"),
    ("CAN", "Canada"),
    ("MEX", "Mexico"),
]

countries_schema = ["code", "name"]
countries_df = spark.createDataFrame(countries_data, schema=countries_schema)

transactions_df.show()
countries_df.show()

+---+----------+------+-------+
| id|      date|amount|country|
+---+----------+------+-------+
|  1|2025-01-01| 100.0|    USA|
|  2|2025-01-02| 200.0|    CAN|
|  3|2025-01-03| 300.0|    MEX|
|  4|2025-01-04| 400.0|    USA|
|  5|2025-01-05| 500.0|    CAN|
+---+----------+------+-------+

+----+-------------+
|code|         name|
+----+-------------+
| USA|United States|
| CAN|       Canada|
| MEX|       Mexico|
+----+-------------+



In [5]:
# realizamos un join normal de ambos
regular_join_df = transactions_df.join(countries_df, transactions_df.country == countries_df.code)
print("regular JOIN")
regular_join_df.show()

regular JOIN
+---+----------+------+-------+----+-------------+
| id|      date|amount|country|code|         name|
+---+----------+------+-------+----+-------------+
|  2|2025-01-02| 200.0|    CAN| CAN|       Canada|
|  5|2025-01-05| 500.0|    CAN| CAN|       Canada|
|  3|2025-01-03| 300.0|    MEX| MEX|       Mexico|
|  1|2025-01-01| 100.0|    USA| USA|United States|
|  4|2025-01-04| 400.0|    USA| USA|United States|
+---+----------+------+-------+----+-------------+



In [6]:
# Uitlizando Broadcast

broadcast_join_df = transactions_df.join(broadcast(countries_df),transactions_df.country == countries_df.code)
print("Broadcast JOIN")
broadcast_join_df.show()


Broadcast JOIN
+---+----------+------+-------+----+-------------+
| id|      date|amount|country|code|         name|
+---+----------+------+-------+----+-------------+
|  1|2025-01-01| 100.0|    USA| USA|United States|
|  2|2025-01-02| 200.0|    CAN| CAN|       Canada|
|  3|2025-01-03| 300.0|    MEX| MEX|       Mexico|
|  4|2025-01-04| 400.0|    USA| USA|United States|
|  5|2025-01-05| 500.0|    CAN| CAN|       Canada|
+---+----------+------+-------+----+-------------+



# Almacenamiento en Cache en Spark

En spark, el almacenamiento en cache permite almacenar en memoria (o disco) los resultados intermedios de las transformaciones en un dataframe o RDD, evitando que se recalculen.

## Proposito del Cache en Spark
- Eficiencia computacional
    - Spark utiliza `lazy evaluation`, lo que significa que las transformaciones no se ejecutan hasta que se realiza una accion.
    - Sin Cache, cada vez que se realiza una accion, Spark recalcula todos los pasos desde el principio.
- Reutilizacion de datos
    - Si necesitas reutilizar el resultado de una transformacion intermedia en multiples operaciones, cache evita la recalculacion.

## Operaciones de cache con Spark
- `cache()` : Indica que los datos deben almacenarse en memoria por defecto, el nivel de almacenamiento predeterminado: `MEMORY_AND_DISK` (almacena en memoria, si no hay suficiente memoria escribe en disco).
- `persist()` : Mas flexible que `cache()` ya que permite especificar niveles de almacenamiento.

```python
df.persist(StorageLevel.MEMORY_AND_DISK)
```

Podemos usar la funcion de Cache/persist para mantener el marco de datos en la memoria, Puede mejorar significativamente el rendimiento de la aplicacion Spark, si alamacenamos en Cache los datos que necesitamos usar con mucha frecuencia en nuestra aplicacion.

## Niveles de almacenamiento

| NIvel de Almacenamiento | Descripcion |
|--------------|--------------|
| `MEMORY_ONLY` | Almacena en memoria. Si no cabe, se recalcula cuando es necesario |
| `MEMORY_AND_DISK` | Almacena en memoria, si no cabe se reescribe en disco |
| `MEMORY_ONLY_SER` | Lo mismo que `MEMORY_ONLY`, pero serializado (Reduce el uso de memoria) |
| `MEMORY_AND_DISK_SER` | Igual que `MEMORY_AND_DISK` pero serializado |
| `DISK_ONLY` | Solamente en disco |
| `OFF_HEAP` |  Usa memoria off-heap (fuera del heap de JAVA), Nesecita configuracion extra |


In [9]:
# Cachea el dataframe
df.cache()

# Fuerza el almacenamiento en cache
df.count()

print("Memory Used : {0}".format(df.storageLevel.useMemory))
print("Disk Used : {0}".format(df.storageLevel.useDisk))

Memory Used : True
Disk Used : True


25/01/05 05:39:18 WARN CacheManager: Asked to cache already cached data.


In [11]:
# En caso que no queramos mas mantener en memoria podemos eliminar el cache. Es una buena practica para cuando ya no requeramos de los datos.
df.unpersist()

sqlContext.clearCache()

# Expreciones con SQL
Se utilizan para ejecutar expreciones SQL directamente dentro de la API de DataFrame, estas funciones son herramientas poderosas cuando queremos aplicas logica SQL sin usar un contexto SQL.
`exp` permite evaluar una expresion SQL en el contexto de un Dataframe, se usa para generar columnas o realizar transformaciones en las existentes.

la funcion `selectExpr` permite seleccionar columnas de un dataframe utilizando expresiones SQL. es una version mas poderosa de select ya que admite expresiones SQL complejas que permite realizar calculos en Linea.



In [12]:
from pyspark.sql.functions import expr

cond = """case when salary > 5000 then 'high_salary'
            else case when salary > 2000 then 'mid_salary'
                else case when salary > 0 then 'low_salary'
                    else 'invalid_salary'
                        end
                    end
                end as salary_level"""

newdf = df.withColumn("salary_level", expr(cond))

newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|NULL|NULL| NULL|  7500| high_salary|
|   9| III| NULL|  4500|  mid_salary|
|  10|NULL|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



In [13]:
newdf = df.selectExpr("*",cond)
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|NULL|NULL| NULL|  7500| high_salary|
|   9| III| NULL|  4500|  mid_salary|
|  10|NULL|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



In [15]:
newdf = df.selectExpr("salary",cond)
newdf.show()

+------+------------+
|salary|salary_level|
+------+------------+
|  1000|  low_salary|
|  1100|  low_salary|
|  3000|  mid_salary|
|  1500|  low_salary|
|  8000| high_salary|
|  7200| high_salary|
|  7100| high_salary|
|  7500| high_salary|
|  4500|  mid_salary|
|  2500|  mid_salary|
+------+------------+



In [None]:
# Funciones Definidas por el usuario
