<a href="https://colab.research.google.com/github/JhonatanWalterSen/spark-in-colab/blob/main/Funciones_extras.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Funciones Fecha, Hora, Strings, Ventana, Optimizer

# Instalaciones

In [1]:
# Instalar Java 8 scas
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
# Descargar Spark
!wget -q http://apache.osuosl.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz

In [3]:
# Descomprimir Spart
!tar xf spark-3.3.1-bin-hadoop3.tgz

In [4]:
# Establecer las variables de Entorno
import os 
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [5]:
# Instalar FindSpark en el sistema
!pip install -q findspark

In [6]:
#Session de spark
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# Fecha y Hora

In [7]:
data = spark.read.parquet('./convertir')

In [8]:
data.printSchema()

root
 |-- date: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- date_str: string (nullable = true)
 |-- ts_str: string (nullable = true)



In [10]:
data.show(truncate=False)

+----------+-----------------------+----------+----------------+
|date      |timestamp              |date_str  |ts_str          |
+----------+-----------------------+----------+----------------+
|2021-01-01|2021-01-01 20:10:50.723|01-01-2021|18-08-2021 46:58|
+----------+-----------------------+----------+----------------+



In [8]:
from pyspark.sql.functions import col, to_date, to_timestamp

In [10]:
data1 = data.select(
    to_date(col('date')).alias('date1'),
    to_timestamp(col('timestamp')).alias('ts1'),
    to_date(col('date_str'),'dd-MM-yyyy').alias('date2'),
    to_timestamp(col('ts_str'),'dd-MM-yyyy mm:ss').alias('ts2')
)

In [11]:
data1.show(truncate=False)

+----------+-----------------------+----------+-------------------+
|date1     |ts1                    |date2     |ts2                |
+----------+-----------------------+----------+-------------------+
|2021-01-01|2021-01-01 20:10:50.723|2021-01-01|2021-08-18 00:46:58|
+----------+-----------------------+----------+-------------------+



In [12]:
data1.printSchema()

root
 |-- date1: date (nullable = true)
 |-- ts1: timestamp (nullable = true)
 |-- date2: date (nullable = true)
 |-- ts2: timestamp (nullable = true)



In [13]:
data.printSchema()

root
 |-- date: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- date_str: string (nullable = true)
 |-- ts_str: string (nullable = true)



In [14]:
from pyspark.sql.functions import date_format

In [16]:
data1.select(
    date_format(col('date1'),'dd-MM-yyyy')
).show()

+------------------------------+
|date_format(date1, dd-MM-yyyy)|
+------------------------------+
|                    01-01-2021|
+------------------------------+



In [17]:
df = spark.read.parquet('./calculo')

In [18]:
df.show()

+------+-------------+------------+-------------------+
|nombre|fecha_ingreso|fecha_salida|       baja_sistema|
+------+-------------+------------+-------------------+
|  Jose|   2021-01-01|  2021-11-14|2021-10-14 15:35:59|
|Mayara|   2021-02-06|  2021-11-25|2021-11-25 10:35:55|
+------+-------------+------------+-------------------+



In [19]:
df.printSchema()

root
 |-- nombre: string (nullable = true)
 |-- fecha_ingreso: string (nullable = true)
 |-- fecha_salida: string (nullable = true)
 |-- baja_sistema: string (nullable = true)



In [20]:
from pyspark.sql.functions import datediff, months_between, last_day

In [22]:
df.select(
    col('nombre'),
    datediff(col('fecha_salida'),col('fecha_ingreso')).alias('dias'),
    months_between(col('fecha_salida'),col('fecha_ingreso')).alias('meses'),
    last_day(col('fecha_salida')).alias('ultimo_dia_mes')
).show()

+------+----+-----------+--------------+
|nombre|dias|      meses|ultimo_dia_mes|
+------+----+-----------+--------------+
|  Jose| 317|10.41935484|    2021-11-30|
|Mayara| 292| 9.61290323|    2021-11-30|
+------+----+-----------+--------------+



In [23]:
from pyspark.sql.functions import date_add, date_sub

In [25]:
df.select(
    col('nombre'),
    col('fecha_ingreso'),
    date_add(col('fecha_ingreso'),14).alias('mas_14_dias'),
    date_sub(col('fecha_ingreso'),1).alias('menos_1_dia')
).show()

+------+-------------+-----------+-----------+
|nombre|fecha_ingreso|mas_14_dias|menos_1_dia|
+------+-------------+-----------+-----------+
|  Jose|   2021-01-01| 2021-01-15| 2020-12-31|
|Mayara|   2021-02-06| 2021-02-20| 2021-02-05|
+------+-------------+-----------+-----------+



In [26]:
from pyspark.sql.functions import year, month, dayofmonth, dayofyear, hour, minute, second

In [28]:
df.select(
    col('baja_sistema'),
    year(col('baja_sistema')),
    month(col('baja_sistema')),
    dayofmonth(col('baja_sistema')),
    dayofyear(col('baja_sistema')),
    hour(col('baja_sistema')),
    minute(col('baja_sistema')),
    second(col('baja_sistema'))
).show()

+-------------------+------------------+-------------------+------------------------+-----------------------+------------------+--------------------+--------------------+
|       baja_sistema|year(baja_sistema)|month(baja_sistema)|dayofmonth(baja_sistema)|dayofyear(baja_sistema)|hour(baja_sistema)|minute(baja_sistema)|second(baja_sistema)|
+-------------------+------------------+-------------------+------------------------+-----------------------+------------------+--------------------+--------------------+
|2021-10-14 15:35:59|              2021|                 10|                      14|                    287|                15|                  35|                  59|
|2021-11-25 10:35:55|              2021|                 11|                      25|                    329|                10|                  35|                  55|
+-------------------+------------------+-------------------+------------------------+-----------------------+------------------+-----------------

# Funciones para trabajo con Strings

## Trim

In [29]:
dataf = spark.read.parquet('./data.parquet')
dataf.show()

+-------+
| nombre|
+-------+
| Spark |
+-------+



In [30]:
from pyspark.sql.functions import ltrim, rtrim, trim

In [35]:
dataf.select(
    ltrim('nombre').alias('ltrim'),
    rtrim('nombre').alias('rtrim'),
    trim('nombre').alias('trim')
).show()

+------+------+-----+
| ltrim| rtrim| trim|
+------+------+-----+
|Spark | Spark|Spark|
+------+------+-----+



## Pad

In [36]:
from pyspark.sql.functions import col, lpad, rpad

In [39]:
dataf.select(
    trim(col('nombre')).alias('trim')  
).select(
    lpad(col('trim'), 8, '-').alias('lpad'),
    rpad(col('trim'), 8, '=').alias('rpad')
).show()

+--------+--------+
|    lpad|    rpad|
+--------+--------+
|---Spark|Spark===|
+--------+--------+



In [40]:
dfn = spark.createDataFrame([('Spark','es','bueno')],['sujeto','verbo','adjetivo'])

In [41]:
dfn.show()

+------+-----+--------+
|sujeto|verbo|adjetivo|
+------+-----+--------+
| Spark|   es|   bueno|
+------+-----+--------+



## Concat_ws Lower Upper InitCap Reverse

In [42]:
from pyspark.sql.functions import concat_ws, lower, upper, initcap, reverse

In [43]:
dfn.select(
    concat_ws(' ', col('sujeto'),col('verbo'),col('adjetivo')).alias('frase')
).select(
    col('frase'),
    lower(col('frase')).alias('minuscula'),
    upper(col('frase')).alias('mayuscula'),
    initcap(col('frase')).alias('initcap'),
    reverse(col('frase')).alias('reverso')
).show()

+--------------+--------------+--------------+--------------+--------------+
|         frase|     minuscula|     mayuscula|       initcap|       reverso|
+--------------+--------------+--------------+--------------+--------------+
|Spark es bueno|spark es bueno|SPARK ES BUENO|Spark Es Bueno|oneub se krapS|
+--------------+--------------+--------------+--------------+--------------+



## Regexp Replace

In [44]:
from pyspark.sql.functions import regexp_replace

In [46]:
df2 = spark.createDataFrame([('Voy a mi casa por mis llaves',)],['frase'])

In [48]:
df2.show(truncate=False)

+----------------------------+
|frase                       |
+----------------------------+
|Voy a mi casa por mis llaves|
+----------------------------+



In [51]:
df2.select(
    regexp_replace(col('frase'), 'Voy|por', 'ir').alias('nueva_frase')
).show(truncate=False)

+--------------------------+
|nueva_frase               |
+--------------------------+
|ir a mi casa ir mis llaves|
+--------------------------+



# Funciones para trabajo con Colecciones

## Size Sort_array Array_contains

In [52]:
datacoleccion = spark.read.parquet('./parquet')

In [54]:
datacoleccion.show(truncate=False)

+-----+--------------------------------------------+
|dia  |tareas                                      |
+-----+--------------------------------------------+
|lunes|[hacer la tarea, buscar agua, lavar el auto]|
+-----+--------------------------------------------+



In [55]:
datacoleccion.printSchema()

root
 |-- dia: string (nullable = true)
 |-- tareas: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [56]:
from pyspark.sql.functions import col, size, sort_array, array_contains

In [57]:
datacoleccion.select(
    size(col('tareas')).alias('tamanio'),
    sort_array(col('tareas')).alias('arreglo_ordenado'),
    array_contains(col('tareas'),'buscar agua').alias('buscar_agua'),
).show(truncate=False)

+-------+--------------------------------------------+-----------+
|tamanio|arreglo_ordenado                            |buscar_agua|
+-------+--------------------------------------------+-----------+
|3      |[buscar agua, hacer la tarea, lavar el auto]|true       |
+-------+--------------------------------------------+-----------+



## Explode

In [7]:
from pyspark.sql.functions import col,explode

In [59]:
datacoleccion.select(
    col('dia'),
    explode(col('tareas')).alias('tareas')
).show()

+-----+--------------+
|  dia|        tareas|
+-----+--------------+
|lunes|hacer la tarea|
|lunes|   buscar agua|
|lunes| lavar el auto|
+-----+--------------+



In [62]:
#formato JSON
dfjs = spark.read.parquet('./JSON')

In [65]:
dfjs.show(truncate=False)

+---------------------------------------------------------------------------+
|tareas_str                                                                 |
+---------------------------------------------------------------------------+
|{"dia": "lunes","tareas": ["hacer la tarea","buscar agua","lavar el auto"]}|
+---------------------------------------------------------------------------+



In [66]:
dfjs.printSchema()

root
 |-- tareas_str: string (nullable = true)



In [70]:
# Esquema JSON
from pyspark.sql.types import StructType, StructField, StringType, ArrayType

schema_json = StructType(
    [
        StructField('dia', StringType(), True),
        StructField('tareas', ArrayType(StringType()), True),
    ]
)

In [73]:
from pyspark.sql.functions import from_json, to_json

In [74]:
json_df= dfjs.select(
    from_json('tareas_str',schema_json).alias('por_hacer')
)

In [76]:
json_df.printSchema()

root
 |-- por_hacer: struct (nullable = true)
 |    |-- dia: string (nullable = true)
 |    |-- tareas: array (nullable = true)
 |    |    |-- element: string (containsNull = true)



In [77]:
json_df.select(
    col('por_hacer').getItem('dia'),
    col('por_hacer').getItem('tareas'),
    col('por_hacer').getItem('tareas').getItem(0).alias('primer_tarea'),
).show(truncate=False)

+-------------+--------------------------------------------+--------------+
|por_hacer.dia|por_hacer.tareas                            |primer_tarea  |
+-------------+--------------------------------------------+--------------+
|lunes        |[hacer la tarea, buscar agua, lavar el auto]|hacer la tarea|
+-------------+--------------------------------------------+--------------+



In [80]:
#lo convierte a String
json_df.select(
    to_json(col('por_hacer'))
).show(truncate=False)

+-------------------------------------------------------------------------+
|to_json(por_hacer)                                                       |
+-------------------------------------------------------------------------+
|{"dia":"lunes","tareas":["hacer la tarea","buscar agua","lavar el auto"]}|
+-------------------------------------------------------------------------+



## When Coalesce y Lit

In [8]:
datos = spark.read.parquet('./data/')

In [9]:
datos.show()

+------+----+
|nombre|pago|
+------+----+
|  Jose|   1|
| Julia|   2|
| Katia|   1|
|  null|   3|
|  Raul|   3|
+------+----+



In [10]:
from pyspark.sql.functions import col, when, lit, coalesce

In [15]:
datos.select(
    col('nombre'),
    when(col('pago') == 1, 'pagado').
    when(col('pago') == 2, 'Pago').
    when(col('pago') == 3, 'sin pagar').
    otherwise('sin iniciar').alias('Estado')
).show()

+------+---------+
|nombre|   Estado|
+------+---------+
|  Jose|   pagado|
| Julia|     Pago|
| Katia|   pagado|
|  null|sin pagar|
|  Raul|sin pagar|
+------+---------+



In [16]:
datos.select(
    coalesce(col('nombre'),lit('sin nombre')).alias('nombre')
).show()

+----------+
|    nombre|
+----------+
|      Jose|
|     Julia|
|     Katia|
|sin nombre|
|      Raul|
+----------+



# Funciones Definidas por el usuario UDF

In [17]:
def f_cubo(n):
  return n * n * n

In [18]:
from pyspark.sql.types import LongType

In [19]:
spark.udf.register('cubo',f_cubo, LongType())

<function __main__.f_cubo(n)>

In [20]:
spark.range(1,10).createOrReplaceTempView('df_temp')

In [21]:
spark.sql('SELECT id, cubo(id) AS cubo FROM df_temp').show()

+---+----+
| id|cubo|
+---+----+
|  1|   1|
|  2|   8|
|  3|  27|
|  4|  64|
|  5| 125|
|  6| 216|
|  7| 343|
|  8| 512|
|  9| 729|
+---+----+



In [31]:
def bienvenida(nombre):
  return ('Hola {}'.format(nombre))

In [23]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

In [24]:
bienvenida_udf = udf(lambda x: bienvenida(x), StringType())

In [25]:
df_nombres = spark.createDataFrame([('Jhonatan',),('Walter',)], ['nombre'])

In [34]:
df_nombres.show()

+--------+
|  nombre|
+--------+
|Jhonatan|
|  Walter|
+--------+



In [36]:
from pyspark.sql.functions import col

In [None]:
#ERROR 2da forma
df_nombres.select(
    col('nombre'),
    bienvenida_udf(col('nombre')).alias('bie_nombre')
).show()

In [39]:
# Tercera Forma
@udf(returnType=StringType())
def mayuscula(s):
  return s.upper()

In [40]:
df_nombres.select(
    col('nombre'),
    mayuscula(col('nombre')).alias('En Mayúscula')
).show()

+--------+------------+
|  nombre|En Mayúscula|
+--------+------------+
|Jhonatan|    JHONATAN|
|  Walter|      WALTER|
+--------+------------+



In [41]:
#Pandas udf
import pandas as pd

In [42]:
from pyspark.sql.functions import pandas_udf

In [43]:
def cubo_pandas(a: pd.Series) -> pd.Series:
  return a*a*a

In [44]:
cubo_udf = pandas_udf(cubo_pandas,returnType=LongType())

In [45]:
x = pd.Series([1,2,3])

In [46]:
print(cubo_pandas(x))

0     1
1     8
2    27
dtype: int64


In [47]:
dfa=spark.range(5)

In [49]:
dfa.select(
    col('id'),
    cubo_udf(col('id')).alias('cubo_pandas')
).show()

+---+-----------+
| id|cubo_pandas|
+---+-----------+
|  0|          0|
|  1|          1|
|  2|          8|
|  3|         27|
|  4|         64|
+---+-----------+



#Funciones de Ventanas

In [50]:
df_ven = spark.read.parquet('./funciones_ventana.parquet')

In [51]:
df_ven.show()

+-------+----+------------+----------+
| nombre|edad|departamento|evaluacion|
+-------+----+------------+----------+
| Lazaro|  45|      letras|        98|
|   Raul|  24|  matemática|        76|
|  Maria|  34|  matemática|        27|
|   Jose|  30|     química|        78|
| Susana|  51|     química|        98|
|   Juan|  44|      letras|        89|
|  Julia|  55|      letras|        92|
|  Kadir|  38|arquitectura|        39|
| Lilian|  23|arquitectura|        94|
|   Rosa|  26|      letras|        91|
|   Aian|  50|  matemática|        73|
|Yaneisy|  29|      letras|        89|
|Enrique|  40|     química|        92|
|    Jon|  25|arquitectura|        78|
|  Luisa|  39|arquitectura|        94|
+-------+----+------------+----------+



In [55]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, row_number, rank, dense_rank, col

In [53]:
winEsp = Window.partitionBy('departamento').orderBy(desc('evaluacion'))

## Row_Number

In [54]:
df_ven.withColumn('row_number',row_number().over(winEsp)).show()

+-------+----+------------+----------+----------+
| nombre|edad|departamento|evaluacion|row_number|
+-------+----+------------+----------+----------+
| Lilian|  23|arquitectura|        94|         1|
|  Luisa|  39|arquitectura|        94|         2|
|    Jon|  25|arquitectura|        78|         3|
|  Kadir|  38|arquitectura|        39|         4|
| Lazaro|  45|      letras|        98|         1|
|  Julia|  55|      letras|        92|         2|
|   Rosa|  26|      letras|        91|         3|
|   Juan|  44|      letras|        89|         4|
|Yaneisy|  29|      letras|        89|         5|
|   Raul|  24|  matemática|        76|         1|
|   Aian|  50|  matemática|        73|         2|
|  Maria|  34|  matemática|        27|         3|
| Susana|  51|     química|        98|         1|
|Enrique|  40|     química|        92|         2|
|   Jose|  30|     química|        78|         3|
+-------+----+------------+----------+----------+



In [None]:
# 2 primeros de cada departament

In [57]:
df_ven.withColumn('row_number',row_number().over(winEsp)).filter(col('row_number').isin(1,2)).show()

+-------+----+------------+----------+----------+
| nombre|edad|departamento|evaluacion|row_number|
+-------+----+------------+----------+----------+
| Lilian|  23|arquitectura|        94|         1|
|  Luisa|  39|arquitectura|        94|         2|
| Lazaro|  45|      letras|        98|         1|
|  Julia|  55|      letras|        92|         2|
|   Raul|  24|  matemática|        76|         1|
|   Aian|  50|  matemática|        73|         2|
| Susana|  51|     química|        98|         1|
|Enrique|  40|     química|        92|         2|
+-------+----+------------+----------+----------+



## Rank

In [58]:
# un rango al resultadodeja huecos en el rango cuando hay empates
df_ven.withColumn('rank',rank().over(winEsp)).show()

+-------+----+------------+----------+----+
| nombre|edad|departamento|evaluacion|rank|
+-------+----+------------+----------+----+
| Lilian|  23|arquitectura|        94|   1|
|  Luisa|  39|arquitectura|        94|   1|
|    Jon|  25|arquitectura|        78|   3|
|  Kadir|  38|arquitectura|        39|   4|
| Lazaro|  45|      letras|        98|   1|
|  Julia|  55|      letras|        92|   2|
|   Rosa|  26|      letras|        91|   3|
|   Juan|  44|      letras|        89|   4|
|Yaneisy|  29|      letras|        89|   4|
|   Raul|  24|  matemática|        76|   1|
|   Aian|  50|  matemática|        73|   2|
|  Maria|  34|  matemática|        27|   3|
| Susana|  51|     química|        98|   1|
|Enrique|  40|     química|        92|   2|
|   Jose|  30|     química|        78|   3|
+-------+----+------------+----------+----+



## Dense rank

In [59]:
df_ven.withColumn('dense_rank',dense_rank().over(winEsp)).show()

+-------+----+------------+----------+----------+
| nombre|edad|departamento|evaluacion|dense_rank|
+-------+----+------------+----------+----------+
| Lilian|  23|arquitectura|        94|         1|
|  Luisa|  39|arquitectura|        94|         1|
|    Jon|  25|arquitectura|        78|         2|
|  Kadir|  38|arquitectura|        39|         3|
| Lazaro|  45|      letras|        98|         1|
|  Julia|  55|      letras|        92|         2|
|   Rosa|  26|      letras|        91|         3|
|   Juan|  44|      letras|        89|         4|
|Yaneisy|  29|      letras|        89|         4|
|   Raul|  24|  matemática|        76|         1|
|   Aian|  50|  matemática|        73|         2|
|  Maria|  34|  matemática|        27|         3|
| Susana|  51|     química|        98|         1|
|Enrique|  40|     química|        92|         2|
|   Jose|  30|     química|        78|         3|
+-------+----+------------+----------+----------+



In [62]:
from pyspark.sql.functions import min,max,avg

In [65]:
WSAgg = Window.partitionBy('departamento')

In [72]:
(df_ven.withColumn('min',min(col('evaluacion')).over(WSAgg))
    .withColumn('max',max(col('evaluacion')).over(WSAgg))
    .withColumn('avg',avg(col('evaluacion')).over(WSAgg))
    .withColumn('row_number',row_number().over(winEsp))
).show()

+-------+----+------------+----------+---+---+------------------+----------+
| nombre|edad|departamento|evaluacion|min|max|               avg|row_number|
+-------+----+------------+----------+---+---+------------------+----------+
| Lilian|  23|arquitectura|        94| 39| 94|             76.25|         1|
|  Luisa|  39|arquitectura|        94| 39| 94|             76.25|         2|
|    Jon|  25|arquitectura|        78| 39| 94|             76.25|         3|
|  Kadir|  38|arquitectura|        39| 39| 94|             76.25|         4|
| Lazaro|  45|      letras|        98| 89| 98|              91.8|         1|
|  Julia|  55|      letras|        92| 89| 98|              91.8|         2|
|   Rosa|  26|      letras|        91| 89| 98|              91.8|         3|
|   Juan|  44|      letras|        89| 89| 98|              91.8|         4|
|Yaneisy|  29|      letras|        89| 89| 98|              91.8|         5|
|   Raul|  24|  matemática|        76| 27| 76|58.666666666666664|         1|

# Catalyst Optimizer

In [74]:
# plan logico -> catalyst -> Plan Fisica basadas en reglas
data_vuelos = spark.read.parquet('./vuelos.parquet')

In [75]:
data_vuelos.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (null

In [76]:
data_vuelos.show()

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

In [79]:
nuevo_df = (data_vuelos.filter(col('MONTH').isin(6,7,8))
            .withColumn('dis_tiempo_aire',col('DISTANCE') / col('AIR_TIME'))
).select(
    col('AIRLINE'),
    col('dis_tiempo_aire')
).where(col('AIRLINE').isin('AA','DL','AS'))

In [80]:
nuevo_df.explain(True)

== Parsed Logical Plan ==
'Filter 'AIRLINE IN (AA,DL,AS)
+- Project [AIRLINE#648, dis_tiempo_aire#866]
   +- Project [YEAR#644, MONTH#645, DAY#646, DAY_OF_WEEK#647, AIRLINE#648, FLIGHT_NUMBER#649, TAIL_NUMBER#650, ORIGIN_AIRPORT#651, DESTINATION_AIRPORT#652, SCHEDULED_DEPARTURE#653, DEPARTURE_TIME#654, DEPARTURE_DELAY#655, TAXI_OUT#656, WHEELS_OFF#657, SCHEDULED_TIME#658, ELAPSED_TIME#659, AIR_TIME#660, DISTANCE#661, WHEELS_ON#662, TAXI_IN#663, SCHEDULED_ARRIVAL#664, ARRIVAL_TIME#665, ARRIVAL_DELAY#666, DIVERTED#667, ... 8 more fields]
      +- Filter MONTH#645 IN (6,7,8)
         +- Relation [YEAR#644,MONTH#645,DAY#646,DAY_OF_WEEK#647,AIRLINE#648,FLIGHT_NUMBER#649,TAIL_NUMBER#650,ORIGIN_AIRPORT#651,DESTINATION_AIRPORT#652,SCHEDULED_DEPARTURE#653,DEPARTURE_TIME#654,DEPARTURE_DELAY#655,TAXI_OUT#656,WHEELS_OFF#657,SCHEDULED_TIME#658,ELAPSED_TIME#659,AIR_TIME#660,DISTANCE#661,WHEELS_ON#662,TAXI_IN#663,SCHEDULED_ARRIVAL#664,ARRIVAL_TIME#665,ARRIVAL_DELAY#666,DIVERTED#667,... 7 more fields]

#Ejercicios
Adjunto como recurso a esta lección están los archivos movies.csv y movie_ratings.csv. En ambos archivos las columnas están delimitadas por un pip(“|”).

Cada línea del archivo movies.csv representa a un actor que actuó en una película. Si una película tiene diez actores, habrá diez filas para esa película en particular.

1. Calcule la cantidad de películas en las que participó cada actor. La salida debe tener dos columnas: actor y conteo. La salida debe ordenarse por el conteo en orden descendente.

2. Calcule la cantidad de películas producidas cada año. La salida debe tener tres columnas: año, siglo al que pertenece el año y conteo. La salida debe ordenarse por el conteo en orden descendente.

3. Obtenga la película con la calificación más alta por año. La salida debe tener solo una película por año y debe contener tres columnas: año, título de la película y valoración.

In [87]:
peliculas = spark.read.option('sep','|').option('inferSchema','true').option('header','true').csv('./movies.csv')
calificacion = spark.read.option('sep','|').option('inferSchema','true').option('header','true').csv('./movie_ratings.csv')

In [86]:
peliculas.show()

+-----------------+--------------------+----+
|            actor|            pelicula| año|
+-----------------+--------------------+----+
|McClure, Marc (I)|       Freaky Friday|2003|
|McClure, Marc (I)|        Coach Carter|2005|
|McClure, Marc (I)|         Superman II|1980|
|McClure, Marc (I)|           Apollo 13|1995|
|McClure, Marc (I)|            Superman|1978|
|McClure, Marc (I)|  Back to the Future|1985|
|McClure, Marc (I)|Back to the Futur...|1990|
|Cooper, Chris (I)|  Me, Myself & Irene|2000|
|Cooper, Chris (I)|         October Sky|1999|
|Cooper, Chris (I)|              Capote|2005|
|Cooper, Chris (I)|The Bourne Supremacy|2004|
|Cooper, Chris (I)|         The Patriot|2000|
|Cooper, Chris (I)|            The Town|2010|
|Cooper, Chris (I)|          Seabiscuit|2003|
|Cooper, Chris (I)|      A Time to Kill|1996|
|Cooper, Chris (I)|Where the Wild Th...|2009|
|Cooper, Chris (I)|         The Muppets|2011|
|Cooper, Chris (I)|     American Beauty|1999|
|Cooper, Chris (I)|             Sy

In [88]:
peliculas.groupBy('actor').count().orderBy(desc('count')).show()

+-------------------+-----+
|              actor|count|
+-------------------+-----+
|   Tatasciore, Fred|   38|
|      Welker, Frank|   38|
| Jackson, Samuel L.|   32|
|      Harnell, Jess|   31|
|        Damon, Matt|   27|
|      Willis, Bruce|   27|
|  Cummings, Jim (I)|   26|
|         Hanks, Tom|   25|
|   Lynn, Sherry (I)|   25|
|    Bergen, Bob (I)|   25|
|    McGowan, Mickie|   25|
|      Proctor, Phil|   24|
|        Cruise, Tom|   23|
|         Pitt, Brad|   23|
|   Wilson, Owen (I)|   23|
|       Depp, Johnny|   22|
|Freeman, Morgan (I)|   22|
|     Morrison, Rana|   22|
|Williams, Robin (I)|   22|
|      Diaz, Cameron|   21|
+-------------------+-----+
only showing top 20 rows



In [89]:
from pyspark.sql.functions import countDistinct,when, lit

In [96]:
peliculas.groupBy('año').agg(
    countDistinct('pelicula').alias('conteo')
).withColumn('siglo',when((col('año') >= 1900) & (col('año') < 2000), lit('XX')).otherwise(lit('XXI'))).orderBy(desc('conteo')).select(
    col('año'),
    col('siglo'),
    col('conteo')
).show()

+----+-----+------+
| año|siglo|conteo|
+----+-----+------+
|2011|  XXI|    86|
|2004|  XXI|    86|
|2006|  XXI|    86|
|2005|  XXI|    85|
|2008|  XXI|    82|
|2002|  XXI|    81|
|2010|  XXI|    78|
|2000|  XXI|    77|
|2003|  XXI|    76|
|2007|  XXI|    75|
|2001|  XXI|    71|
|2009|  XXI|    68|
|1999|   XX|    67|
|1997|   XX|    66|
|1998|   XX|    59|
|1996|   XX|    42|
|2012|  XXI|    32|
|1995|   XX|    25|
|1994|   XX|    16|
|1986|   XX|    16|
+----+-----+------+
only showing top 20 rows



In [97]:
calificacion.show()

+----------+--------------------+----+
|valoracion|            pelicula| año|
+----------+--------------------+----+
|    1.6339|'Crocodile' Dunde...|1988|
|    7.6177|                  10|1979|
|    1.2864|10 Things I Hate ...|1999|
|    0.3243|           10,000 BC|2008|
|    0.3376|      101 Dalmatians|1996|
|    0.5218|      102 Dalmatians|2000|
|   12.8205|                1066|2012|
|    0.6829|                  12|2007|
|    7.4061|           12 Rounds|2009|
|    2.3677|           127 Hours|2010|
|    1.3585|      13 Going on 30|2004|
|    8.4034|     13 game sayawng|2006|
|      0.59|                1408|2007|
|    4.4292|          15 Minutes|2001|
|    2.2118|           16 Blocks|2006|
|    1.0491|            17 Again|2009|
|    3.9265|                1941|1979|
|   10.4757|2 Days in the Valley|1996|
|       0.4|    2 Fast 2 Furious|2003|
|   11.1111|              2 Guns|2013|
+----------+--------------------+----+
only showing top 20 rows



In [99]:
windowsSpec = Window.partitionBy('año').orderBy(desc('valoracion'))
windowsSpecAgg = Window.partitionBy('año')

In [101]:
''' calificacion.select(row_number().over(windowsSpec),max('valoracion').over(windowsSpecAgg).alias('max_val')
).show() '''

+----------+-------+
|row_number|max_val|
+----------+-------+
|         1| 2.2207|
|         1| 7.9215|
|         2| 7.9215|
|         1| 7.8557|
|         2| 7.8557|
|         1| 1.5053|
|         1|  7.602|
|         1| 9.4226|
|         1| 5.4756|
|         1|10.7625|
|         1| 5.1258|
|         1|14.0607|
|         2|14.0607|
|         1| 6.3919|
|         2| 6.3919|
|         1|10.6375|
|         2|10.6375|
|         1| 0.6726|
|         1|12.8866|
|         2|12.8866|
+----------+-------+
only showing top 20 rows

