# Funciones en PySpark

En este notebook aprenderemos algunas funciones avanzadas para optimizar el rendimiento de Spark, para imputar valores faltantes o a crear funciones definidas por el usuario (UDF).

In [1]:
# Install spark-related dependencies
!wget -q  https://apache.osuosl.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz

!pip install -q findspark
!pip install pyspark
# Set up required environment variables

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=cb9677a556b6799a6d4be25cea3df3a539f8b54e0f402e2ac40cf54467ff181b
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast
from pyspark.sql.types import *

### Crea la sesión de SparkSession

In [4]:
spark = SparkSession.builder.getOrCreate()

### Crear el DataFrame

In [5]:
emp = [(1, "AAA", "dept1", 1000),
    (2, "BBB", "dept1", 1100),
    (3, "CCC", "dept1", 3000),
    (4, "DDD", "dept1", 5500),
    (5, "EEE", "dept2", 8000),
    (6, "FFF", "dept2", 9200),
    (7, "GGG", "dept3", 1100),
    (None, None, None, 5500),
    (9, "III", None, 3500),
    (10, None, "dept5", 2500)]

dept = [("dept1", "Department - 1"),
        ("dept2", "Department - 2"),
        ("dept3", "Department - 3"),
        ("dept4", "Department - 4")
       ]

df = spark.createDataFrame(emp, ["id", "name", "dept", "salary"])
deptdf = spark.createDataFrame(dept, ["id", "name"])

# Create Temp Tables
df.createOrReplaceTempView("empdf")
deptdf.createOrReplaceTempView("deptdf")

#  Expresiones SQL

También podemos usar la expresión SQL para la manipulación de datos. Tenemos la función **expr** y también una variante de un método de selección como **selectExpr** para la evaluación de expresiones SQL.

In [6]:
from pyspark.sql.functions import expr

cond = """case when salary > 5000 then 'high_salary'
               else case when salary > 2000 then 'mid_salary'
                    else case when salary > 0 then 'low_salary'
                         else 'invalid_salary'
                              end
                         end
                end as salary_level"""

newdf = df.withColumn("salary_level", expr(cond))
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  5500| high_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  9200| high_salary|
|   7| GGG|dept3|  1100|  low_salary|
|NULL|NULL| NULL|  5500| high_salary|
|   9| III| NULL|  3500|  mid_salary|
|  10|NULL|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



### Usando la función selectExpr

In [7]:
newdf = df.selectExpr("*", cond)
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  5500| high_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  9200| high_salary|
|   7| GGG|dept3|  1100|  low_salary|
|NULL|NULL| NULL|  5500| high_salary|
|   9| III| NULL|  3500|  mid_salary|
|  10|NULL|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



### Funciones definidas por el usuario (UDF)
A menudo necesitamos escribir la función en función de nuestro requisito muy específico. Aquí podemos aprovechar las udfs. Podemos escribir nuestras propias funciones en un lenguaje como python y registrar la función como udf, luego podemos usar la función para operaciones de DataFrame.

* Función de Python para encontrar el nivel_salario para un salario dado.

In [8]:
def detSalary_Level(sal):
    level = None

    if(sal > 5000):
        level = 'high_salary'
    elif(sal > 2000):
        level = 'mid_salary'
    elif(sal > 0):
        level = 'low_salary'
    else:
        level = 'invalid_salary'
    return level

* Luego registre la función "detSalary_Level" como UDF.

In [9]:
sal_level = udf(detSalary_Level, StringType())

* Aplicar función para determinar el salario_level para un salario dado.

In [10]:
newdf = df.withColumn("salary_level", sal_level("salary"))
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  5500| high_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  9200| high_salary|
|   7| GGG|dept3|  1100|  low_salary|
|NULL|NULL| NULL|  5500| high_salary|
|   9| III| NULL|  3500|  mid_salary|
|  10|NULL|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



# Trabajando con valores NULL

Los valores NULL siempre son difíciles de manejar independientemente del Framework o lenguaje que usemos. Aquí en Spark tenemos pocas funciones específicas para lidiar con valores NULL.

**isNull**()

Esta función nos ayudará a encontrar los valores nulos para cualquier columna dada. Por ejemplo si necesitamos encontrar las columnas donde las columnas id contienen los valores nulos.

In [11]:
newdf = df.filter(df["dept"].isNull())
newdf.show()

+----+----+----+------+
|  id|name|dept|salary|
+----+----+----+------+
|NULL|NULL|NULL|  5500|
|   9| III|NULL|  3500|
+----+----+----+------+



**isNotNull**()


Esta función funciona de manera opuesta a la función isNull () y devolverá todos los valores no nulos para una función en particular.

In [12]:
newdf = df.filter(df["dept"].isNotNull())
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  5500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  9200|
|  7| GGG|dept3|  1100|
| 10|NULL|dept5|  2500|
+---+----+-----+------+



**fillna**()

Esta función nos ayudará a reemplazar los valores nulos.

In [13]:
# Replace -1 where the salary is null.
newdf = df.fillna("INVALID", ["dept"])
newdf.show()

+----+----+-------+------+
|  id|name|   dept|salary|
+----+----+-------+------+
|   1| AAA|  dept1|  1000|
|   2| BBB|  dept1|  1100|
|   3| CCC|  dept1|  3000|
|   4| DDD|  dept1|  5500|
|   5| EEE|  dept2|  8000|
|   6| FFF|  dept2|  9200|
|   7| GGG|  dept3|  1100|
|NULL|NULL|INVALID|  5500|
|   9| III|INVALID|  3500|
|  10|NULL|  dept5|  2500|
+----+----+-------+------+



**dropna**()

Esta función nos ayudará a eliminar las filas con valores nulos.

In [14]:
newdf = df.dropna()
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  5500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  9200|
|  7| GGG|dept3|  1100|
+---+----+-----+------+



In [None]:
from random import randint

# create a list of random numbers between 10 to 1000
my_large_list = [randint(10,1000) for x in range(0,20000000)]

# create one partition of the list
my_large_list_one_partition = spark.parallelize(my_large_list,numSlices=1)

# check number of partitions
print(my_large_list_one_partition.getNumPartitions())
# >> 1

# filter numbers greater than equal to 200
my_large_list_one_partition = my_large_list_one_partition.filter(lambda x : x >= 200)

# code was run in a jupyter notebook
# to calculate the time taken to execute the following command
%%time

# count the number of elements in filtered list
print(my_large_list_one_partition.count())
# >> 16162207