In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.3/spark-3.2.3-bin-hadoop3.2.tgz
!tar xf spark-3.2.3-bin-hadoop3.2.tgz
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.3-bin-hadoop3.2"
!pip install -q findspark
!pip install -q pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m17.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

**FECHAS**

In [3]:
data = spark.read.parquet('./data/convertir')
data.printSchema()
data.show(truncate=False)

root
 |-- date: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- date_str: string (nullable = true)
 |-- ts_str: string (nullable = true)

+----------+-----------------------+----------+----------------+
|date      |timestamp              |date_str  |ts_str          |
+----------+-----------------------+----------+----------------+
|2021-01-01|2021-01-01 20:10:50.723|01-01-2021|18-08-2021 46:58|
+----------+-----------------------+----------+----------------+



In [4]:
# Transformar a date y timestamp
from pyspark.sql.functions import col, to_date, to_timestamp

data1 = data.select(
    to_date(col('date')).alias('date1'), # Ya viene en formato año, mes, día (no es necesario indicar formato)
    to_timestamp(col('timestamp')).alias('ts1'), # Ya viene en formato tipo de timestamp (no es necesario indicar formato)
    to_date(col('date_str'), 'dd-MM-yyyy').alias('date2'),
    to_timestamp(col('ts_str'), 'dd-MM-yyyy mm:ss').alias('ts2')

)
data1.show(truncate=False)
data1.printSchema()

+----------+-----------------------+----------+-------------------+
|date1     |ts1                    |date2     |ts2                |
+----------+-----------------------+----------+-------------------+
|2021-01-01|2021-01-01 20:10:50.723|2021-01-01|2021-08-18 00:46:58|
+----------+-----------------------+----------+-------------------+

root
 |-- date1: date (nullable = true)
 |-- ts1: timestamp (nullable = true)
 |-- date2: date (nullable = true)
 |-- ts2: timestamp (nullable = true)



In [5]:
# Para cambiar el formato
from pyspark.sql.functions import date_format

data1.select(
    date_format(col('date1'), 'dd-MM-yyyy')
).show()

+------------------------------+
|date_format(date1, dd-MM-yyyy)|
+------------------------------+
|                    01-01-2021|
+------------------------------+



In [6]:
df = spark.read.parquet('./data/calculo')
df.show()

+------+-------------+------------+-------------------+
|nombre|fecha_ingreso|fecha_salida|       baja_sistema|
+------+-------------+------------+-------------------+
|  Jose|   2021-01-01|  2021-11-14|2021-10-14 15:35:59|
|Mayara|   2021-02-06|  2021-11-25|2021-11-25 10:35:55|
+------+-------------+------------+-------------------+



In [7]:
from pyspark.sql.functions import datediff, months_between, last_day

df.select(
    col('nombre'),
    datediff(col('fecha_salida'), col('fecha_ingreso')).alias('dias'), # número de días entre dos fechas
    months_between(col('fecha_salida'), col('fecha_ingreso')).alias('meses'), # número de meses entre dos fechas
    last_day(col('fecha_salida')).alias('ultimo_dia_mes') # último día del mes
).show()

+------+----+-----------+--------------+
|nombre|dias|      meses|ultimo_dia_mes|
+------+----+-----------+--------------+
|  Jose| 317|10.41935484|    2021-11-30|
|Mayara| 292| 9.61290323|    2021-11-30|
+------+----+-----------+--------------+



In [8]:
from pyspark.sql.functions import date_add, date_sub

df.select(
    col('nombre'),
    col('fecha_ingreso'),
    date_add(col('fecha_ingreso'), 14).alias('mas_14_dias'), # agregar días
    date_sub(col('fecha_ingreso'), 1).alias('menos_1_dia') # restar días
).show()

+------+-------------+-----------+-----------+
|nombre|fecha_ingreso|mas_14_dias|menos_1_dia|
+------+-------------+-----------+-----------+
|  Jose|   2021-01-01| 2021-01-15| 2020-12-31|
|Mayara|   2021-02-06| 2021-02-20| 2021-02-05|
+------+-------------+-----------+-----------+



In [9]:
from pyspark.sql.functions import year, month, dayofmonth, dayofyear, hour, minute, second

df.select(
    col('baja_sistema'),
    year(col('baja_sistema')), # obtener el año
    month(col('baja_sistema')), # obtener el mes
    dayofmonth(col('baja_sistema')), # obtener el número de día del mes
    dayofyear(col('baja_sistema')), # obtener el número de día del año
    hour(col('baja_sistema')), # obtener hora
    minute(col('baja_sistema')), # obtener minutos
    second(col('baja_sistema')) # obtener segundos
).show()

+-------------------+------------------+-------------------+------------------------+-----------------------+------------------+--------------------+--------------------+
|       baja_sistema|year(baja_sistema)|month(baja_sistema)|dayofmonth(baja_sistema)|dayofyear(baja_sistema)|hour(baja_sistema)|minute(baja_sistema)|second(baja_sistema)|
+-------------------+------------------+-------------------+------------------------+-----------------------+------------------+--------------------+--------------------+
|2021-10-14 15:35:59|              2021|                 10|                      14|                    287|                15|                  35|                  59|
|2021-11-25 10:35:55|              2021|                 11|                      25|                    329|                10|                  35|                  55|
+-------------------+------------------+-------------------+------------------------+-----------------------+------------------+-----------------

**STRINGS**

In [10]:
data = spark.read.parquet('./data/strings')
data.show()

+-------+
| nombre|
+-------+
| Spark |
+-------+



In [11]:
from pyspark.sql.functions import ltrim, rtrim, trim

data.select(
    ltrim('nombre').alias('ltrim'), # Eliminar espacios a la izquierda
    rtrim('nombre').alias('rtrim'), # Eliminar espacios a la derecha
    trim('nombre').alias('trim') # Eliminar espacios de la izquierda y derecha
).show()

+------+------+-----+
| ltrim| rtrim| trim|
+------+------+-----+
|Spark | Spark|Spark|
+------+------+-----+



In [12]:
from pyspark.sql.functions import col, lpad, rpad

data.select(
    trim(col('nombre')).alias('trim') # Elimino los espacios
).select(
    lpad(col('trim'), 8, '-').alias('lpad'), # Rellena a la izquierda (8 es la cantidad de caracteres hasta el que quiero rellenar)
    rpad(col('trim'), 8, '=').alias('rpad') # Rellena a la derecha (8 es la cantidad de caracteres hasta el que quiero rellenar)
).show()

+--------+--------+
|    lpad|    rpad|
+--------+--------+
|---Spark|Spark===|
+--------+--------+



In [13]:
df1 = spark.createDataFrame([('Spark', 'es', 'maravilloso')], ['sujeto', 'verbo', 'adjetivo'])
df1.show()

+------+-----+-----------+
|sujeto|verbo|   adjetivo|
+------+-----+-----------+
| Spark|   es|maravilloso|
+------+-----+-----------+



In [14]:
from pyspark.sql.functions import concat_ws, lower, upper, initcap, reverse

df1.select(
    concat_ws(' ', col('sujeto'), col('verbo'), col('adjetivo')).alias('frase') #concatenar
).select(
    col('frase'),
    lower(col('frase')).alias('minuscula'), # minúscula
    upper(col('frase')).alias('mayuscula'), # mayúscula
    initcap(col('frase')).alias('initcap'), # Mayúscula primer caracter (nompropio)
    reverse(col('frase')).alias('reversa') # Dar vuelta el string
).show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|               frase|           minuscula|           mayuscula|             initcap|             reversa|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|Spark es maravilloso|spark es maravilloso|SPARK ES MARAVILLOSO|Spark Es Maravilloso|osollivaram se krapS|
+--------------------+--------------------+--------------------+--------------------+--------------------+



In [15]:
from pyspark.sql.functions import regexp_replace

df2 = spark.createDataFrame([(' voy a casa por mis llaves',)], ['frase'])

df2.show(truncate=False)

df2.select(
    regexp_replace(col('frase'), 'voy|por', 'ir').alias('nueva_frase') # remplazar expresiones
).show(truncate=False)

+--------------------------+
|frase                     |
+--------------------------+
| voy a casa por mis llaves|
+--------------------------+

+------------------------+
|nueva_frase             |
+------------------------+
| ir a casa ir mis llaves|
+------------------------+



**COLECCIONES**

In [16]:
data = spark.read.parquet('./data/colecciones/')
data.show(truncate=False)
data.printSchema()

+-----+--------------------------------------------+
|dia  |tareas                                      |
+-----+--------------------------------------------+
|lunes|[hacer la tarea, buscar agua, lavar el auto]|
+-----+--------------------------------------------+

root
 |-- dia: string (nullable = true)
 |-- tareas: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [17]:
from pyspark.sql.functions import col, size, sort_array, array_contains

data.select(
    size(col('tareas')).alias('tamaño'), # tamaño del array
    sort_array(col('tareas')).alias('arreglo_ordenado'), # ordena
    array_contains(col('tareas'), 'buscar agua').alias('buscar_agua') #true o false, verifica si está el elemento en el array
).show(truncate=False)


+------+--------------------------------------------+-----------+
|tamaño|arreglo_ordenado                            |buscar_agua|
+------+--------------------------------------------+-----------+
|3     |[buscar agua, hacer la tarea, lavar el auto]|true       |
+------+--------------------------------------------+-----------+



In [18]:
from pyspark.sql.functions import explode

data.select(
    col('dia'),
    explode(col('tareas')).alias('tareas') # Dividir cada elemento del arreglo en una fila
).show()

+-----+--------------+
|  dia|        tareas|
+-----+--------------+
|lunes|hacer la tarea|
|lunes|   buscar agua|
|lunes| lavar el auto|
+-----+--------------+



In [22]:
# Formato JSON
json_df_str = spark.read.parquet('./data/colecciones_JSON')
json_df_str.show(truncate=False)
json_df_str.printSchema()

+---------------------------------------------------------------------------+
|tareas_str                                                                 |
+---------------------------------------------------------------------------+
|{"dia": "lunes","tareas": ["hacer la tarea","buscar agua","lavar el auto"]}|
+---------------------------------------------------------------------------+

root
 |-- tareas_str: string (nullable = true)



In [23]:
# Para transformar una cadena de string a formato JSON
from pyspark.sql.types import StructType, StructField, StringType, ArrayType

schema_json = StructType(
    [
     StructField('dia', StringType(), True),
     StructField('tareas', ArrayType(StringType()), True)
    ]
)


In [24]:
from pyspark.sql.functions import from_json, to_json

json_df = json_df_str.select(
    from_json(col('tareas_str'), schema_json).alias('por_hacer')
)
json_df.printSchema()

root
 |-- por_hacer: struct (nullable = true)
 |    |-- dia: string (nullable = true)
 |    |-- tareas: array (nullable = true)
 |    |    |-- element: string (containsNull = true)



In [25]:
json_df.show()

+--------------------+
|           por_hacer|
+--------------------+
|{lunes, [hacer la...|
+--------------------+



In [27]:
#Para acceder a cada elemento
json_df.select(
    col('por_hacer').getItem('dia'),
    col('por_hacer').getItem('tareas'),
    col('por_hacer').getItem('tareas').getItem(0).alias('primer_tarea')
).show(truncate=False)

# Convertir a un string JSON
json_df.select(
    to_json(col('por_hacer'))
).show(truncate=False)

+-------------+--------------------------------------------+--------------+
|por_hacer.dia|por_hacer.tareas                            |primer_tarea  |
+-------------+--------------------------------------------+--------------+
|lunes        |[hacer la tarea, buscar agua, lavar el auto]|hacer la tarea|
+-------------+--------------------------------------------+--------------+

+-------------------------------------------------------------------------+
|to_json(por_hacer)                                                       |
+-------------------------------------------------------------------------+
|{"dia":"lunes","tareas":["hacer la tarea","buscar agua","lavar el auto"]}|
+-------------------------------------------------------------------------+



**WHEN, COALESCE y LIT**

In [29]:
data = spark.read.parquet('./data/wcyl')
data.show()

+------+----+
|nombre|pago|
+------+----+
|  Jose|   1|
| Julia|   2|
| Katia|   1|
|  null|   3|
|  Raul|   3|
+------+----+



In [30]:
from pyspark.sql.functions import col, when, lit, coalesce

# when para asignar nuevo valor según valores de la columna
data.select(
    col('nombre'),
    when(col('pago') == 1, 'pagado').when(col('pago') == 2, 'sin pagar').otherwise('sin iniciar').alias('pago')
).show()

# coalesce para convertir nulo en otro elemento
# lit literal
data.select(
    coalesce(col('nombre'), lit('sin nombre')).alias('nombre')
).show()


+------+-----------+
|nombre|       pago|
+------+-----------+
|  Jose|     pagado|
| Julia|  sin pagar|
| Katia|     pagado|
|  null|sin iniciar|
|  Raul|sin iniciar|
+------+-----------+

+----------+
|    nombre|
+----------+
|      Jose|
|     Julia|
|     Katia|
|sin nombre|
|      Raul|
+----------+



**FUNCIONES DEFINIDAS POR USUARIO UDF**

In [32]:
def f_cubo(n):
    return n * n * n

In [33]:
from pyspark.sql.types import LongType

# Registrar UDF
#spark.udf.register(nombre_asignado, función, tipo de dato de retorno)
spark.udf.register('cubo', f_cubo, LongType())

# Testeo
spark.range(1,10).createOrReplaceTempView('df_temp')

spark.sql("SELECT id, cubo(id) AS cubo FROM df_temp").show()

+---+----+
| id|cubo|
+---+----+
|  1|   1|
|  2|   8|
|  3|  27|
|  4|  64|
|  5| 125|
|  6| 216|
|  7| 343|
|  8| 512|
|  9| 729|
+---+----+



In [34]:
def bienvenida(nombre):
    return ('Hola {}'.format(nombre))

In [36]:
# Otra opción
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

bienvenida_udf = udf(lambda x: bienvenida(x), StringType())
df_nombre = spark.createDataFrame([('Jose',), ('Julia',)], ['nombre'])
df_nombre.show()
from pyspark.sql.functions import col

df_nombre.select(
    col('nombre'),
    bienvenida_udf(col('nombre')).alias('bie_nombre')
).show()

+------+
|nombre|
+------+
|  Jose|
| Julia|
+------+

+------+----------+
|nombre|bie_nombre|
+------+----------+
|  Jose| Hola Jose|
| Julia|Hola Julia|
+------+----------+



In [37]:
# Otra opción
@udf(returnType=StringType())
def mayuscula(s):
    return s.upper()

df_nombre.select(
    col('nombre'),
    mayuscula(col('nombre')).alias('may_nombre')
).show()

+------+----------+
|nombre|may_nombre|
+------+----------+
|  Jose|      JOSE|
| Julia|     JULIA|
+------+----------+



In [38]:
# Otra opción
import pandas as pd
from pyspark.sql.functions import pandas_udf
def cubo_pandas(a: pd.Series) -> pd.Series:
    return a * a * a

cubo_udf = pandas_udf(cubo_pandas, returnType=LongType())

x = pd.Series([1, 2, 3])

print(cubo_pandas(x))

df = spark.range(5)

df.select(
    col('id'),
    cubo_udf(col('id')).alias('cubo_pandas')
).show()

0     1
1     8
2    27
dtype: int64
+---+-----------+
| id|cubo_pandas|
+---+-----------+
|  0|          0|
|  1|          1|
|  2|          8|
|  3|         27|
|  4|         64|
+---+-----------+



**FUNCIONES DE VENTANA**

In [39]:
df = spark.read.parquet('./data/ventana')
df.show()

+-------+----+------------+----------+
| nombre|edad|departamento|evaluacion|
+-------+----+------------+----------+
| Lazaro|  45|      letras|        98|
|   Raul|  24|  matemática|        76|
|  Maria|  34|  matemática|        27|
|   Jose|  30|     química|        78|
| Susana|  51|     química|        98|
|   Juan|  44|      letras|        89|
|  Julia|  55|      letras|        92|
|  Kadir|  38|arquitectura|        39|
| Lilian|  23|arquitectura|        94|
|   Rosa|  26|      letras|        91|
|   Aian|  50|  matemática|        73|
|Yaneisy|  29|      letras|        89|
|Enrique|  40|     química|        92|
|    Jon|  25|arquitectura|        78|
|  Luisa|  39|arquitectura|        94|
+-------+----+------------+----------+



In [41]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, row_number, rank, dense_rank, col

#¿Cuales son las trabajadores con mayor evaluación de cada departamento?
windowSpec = Window.partitionBy('departamento').orderBy(desc('evaluacion'))

In [42]:
# row_number: asigna un número a cada elemento de la particion a partir de uno hasta el número de elementos de la partición.
df.withColumn('row_number', row_number().over(windowSpec)).filter(col('row_number').isin(1,2)).show()

+-------+----+------------+----------+----------+
| nombre|edad|departamento|evaluacion|row_number|
+-------+----+------------+----------+----------+
| Lilian|  23|arquitectura|        94|         1|
|  Luisa|  39|arquitectura|        94|         2|
| Lazaro|  45|      letras|        98|         1|
|  Julia|  55|      letras|        92|         2|
|   Raul|  24|  matemática|        76|         1|
|   Aian|  50|  matemática|        73|         2|
| Susana|  51|     química|        98|         1|
|Enrique|  40|     química|        92|         2|
+-------+----+------------+----------+----------+



In [43]:
# rank: similar a row number, pero deja un hueco en la asignación si hay un empate
df.withColumn('rank', rank().over(windowSpec)).show()

+-------+----+------------+----------+----+
| nombre|edad|departamento|evaluacion|rank|
+-------+----+------------+----------+----+
| Lilian|  23|arquitectura|        94|   1|
|  Luisa|  39|arquitectura|        94|   1|
|    Jon|  25|arquitectura|        78|   3|
|  Kadir|  38|arquitectura|        39|   4|
| Lazaro|  45|      letras|        98|   1|
|  Julia|  55|      letras|        92|   2|
|   Rosa|  26|      letras|        91|   3|
|   Juan|  44|      letras|        89|   4|
|Yaneisy|  29|      letras|        89|   4|
|   Raul|  24|  matemática|        76|   1|
|   Aian|  50|  matemática|        73|   2|
|  Maria|  34|  matemática|        27|   3|
| Susana|  51|     química|        98|   1|
|Enrique|  40|     química|        92|   2|
|   Jose|  30|     química|        78|   3|
+-------+----+------------+----------+----+



In [44]:
# dense_rank: simila a rank pero no deja hueco si hay empate.
df.withColumn('dense_rank', dense_rank().over(windowSpec)).show()

+-------+----+------------+----------+----------+
| nombre|edad|departamento|evaluacion|dense_rank|
+-------+----+------------+----------+----------+
| Lilian|  23|arquitectura|        94|         1|
|  Luisa|  39|arquitectura|        94|         1|
|    Jon|  25|arquitectura|        78|         2|
|  Kadir|  38|arquitectura|        39|         3|
| Lazaro|  45|      letras|        98|         1|
|  Julia|  55|      letras|        92|         2|
|   Rosa|  26|      letras|        91|         3|
|   Juan|  44|      letras|        89|         4|
|Yaneisy|  29|      letras|        89|         4|
|   Raul|  24|  matemática|        76|         1|
|   Aian|  50|  matemática|        73|         2|
|  Maria|  34|  matemática|        27|         3|
| Susana|  51|     química|        98|         1|
|Enrique|  40|     química|        92|         2|
|   Jose|  30|     química|        78|         3|
+-------+----+------------+----------+----------+



In [45]:
# Agregaciones con especificaciones de ventana
windowSpecAgg = Window.partitionBy('departamento')

from pyspark.sql.functions import min, max, avg
(df.withColumn('min', min(col('evaluacion')).over(windowSpecAgg))
.withColumn('max', max(col('evaluacion')).over(windowSpecAgg))
.withColumn('avg', avg(col('evaluacion')).over(windowSpecAgg))
.withColumn('row_number', row_number().over(windowSpec))
 ).show()

+-------+----+------------+----------+---+---+------------------+----------+
| nombre|edad|departamento|evaluacion|min|max|               avg|row_number|
+-------+----+------------+----------+---+---+------------------+----------+
| Lilian|  23|arquitectura|        94| 39| 94|             76.25|         1|
|  Luisa|  39|arquitectura|        94| 39| 94|             76.25|         2|
|    Jon|  25|arquitectura|        78| 39| 94|             76.25|         3|
|  Kadir|  38|arquitectura|        39| 39| 94|             76.25|         4|
| Lazaro|  45|      letras|        98| 89| 98|              91.8|         1|
|  Julia|  55|      letras|        92| 89| 98|              91.8|         2|
|   Rosa|  26|      letras|        91| 89| 98|              91.8|         3|
|   Juan|  44|      letras|        89| 89| 98|              91.8|         4|
|Yaneisy|  29|      letras|        89| 89| 98|              91.8|         5|
|   Raul|  24|  matemática|        76| 27| 76|58.666666666666664|         1|

**CATALYST OPTIMIZER**

Optimiza las consultas creadas por el usuario

In [None]:
data = spark.read.parquet('./data/')
data.printSchema()
data.show()

from pyspark.sql.functions import col

nuevo_df = (data.filter(col('MONTH').isin(6,7,8))
            .withColumn('dis_tiempo_aire', col('DISTANCE') / col('AIR_TIME'))
).select(
    col('AIRLINE'),
    col('dis_tiempo_aire')
).where(col('AIRLINE').isin('AA', 'DL', 'AS'))

nuevo_df.explain(True) # explica el plan lógico y físico para resolver
