# Demo Spark en Cluster

Este demo utiliza los siguientes datos [New York City Taxi and Limousine Commission (TLC) Trip Record Data](https://aws.amazon.com/marketplace/pp/prodview-okyonroqg5b2u?sr=0-1&ref_=beagle&applicationId=AWSMPContessa#usage)

In [1]:
# Importar librerías
from pyspark.sql import SparkSession

# Importar funciones únicas
from pyspark.sql.functions import lit, col

# Importar funciones con un alias, que pudieran 
# tener conflicto con las librerías base de Python, 
# como sum, avg, round, ...
import pyspark.sql.functions as F

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1679898994899_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Recuerda que vamos a trabajar con spark dataframes, por lo tanto tienes que abrir un SparkSession

In [2]:
spark = SparkSession.builder.appName('New York City Taxi').getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Hay que cargar los datasets, nota que podemos utilizar wildcards para leer varios parquets al mismo tiempo.

In [3]:
tbl_yellow_taxi =  spark.read.option("header",True).parquet("s3://itam-analytics-MINOMBRE/taxi/yellow_tripdata_2022-*.parquet")
tbl_green_taxi =  spark.read.option("header",True).parquet("s3://itam-analytics-MINOMBRE/taxi/green_tripdata_2022-*.parquet")



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Veamos las columnas.

In [14]:
tbl_yellow_taxi.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)

In [15]:
tbl_green_taxi.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

Observa como hacer counts, toma tiempo.

In [4]:
tbl_yellow_taxi.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

39656098

In [5]:
tbl_green_taxi.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

840402

Declaremos un pipeline para unir los dataframes de los taxis amarillos y los verdes. Nota que es lazy. Hasta ahora solo es un pipeline que encadena varios transformers.

In [11]:
tbl_union = (
    tbl_yellow_taxi
        .select(col('trip_distance'), col('total_amount'))
        .withColumn("type", lit("yellow"))
     .union(
         tbl_green_taxi
             .select(col('trip_distance'), col('total_amount'))
             .withColumn("type", lit("green"))
     )
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Observa como haciendo un count, se ejecuta en el cluster el pipeline.

In [12]:
tbl_union.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

40496500

Creemos un resumen global de la distancia promedio, el costo por milla promedio y el costo promedio.

In [8]:
(
    tbl_union
        .select(
            F.round(F.avg('trip_distance'),2).alias('avg_dist'),
            F.round(F.avg(col('total_amount')/col('trip_distance')),2).alias('avg_cost_per_mile'),
            F.round(F.avg('total_amount'),2).alias('avg_cost'))
        
).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----------------+--------+
|avg_dist|avg_cost_per_mile|avg_cost|
+--------+-----------------+--------+
|    7.59|            12.71|   21.62|
+--------+-----------------+--------+

Repitamos la misma operación pero ahora agrupando por tipo de taxi.

In [9]:
(tbl_union
     .groupBy('type')
     .agg(
         F.round(F.avg('trip_distance'),2).alias('avg_dist'),
         F.round(F.avg(col('total_amount')/col('trip_distance')),2).alias('avg_cost_per_mile'),
         F.round(F.avg('total_amount'),2).alias('avg_cost')) 
).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+--------+-----------------+--------+
|  type|avg_dist|avg_cost_per_mile|avg_cost|
+------+--------+-----------------+--------+
|yellow|    5.96|            12.68|   21.67|
| green|   84.45|            14.51|   19.32|
+------+--------+-----------------+--------+

Ahora hagamos algo nuevo, imagina que estos parquets queremos convertirlos en un directorio particionado para poder consultarlo en Athena de manera eficiente.

Los datos los vamos a consultar por tipo, año y mes. Eso quiere decir que hay que guardar este dataframe particionado por estas variables.

In [30]:
tbl_yellow_taxi_prime = (
    tbl_yellow_taxi
     .withColumn("type", lit("yellow"))
     .withColumn("year", F.year('tpep_pickup_datetime'))
     .withColumn("month", F.month('tpep_pickup_datetime'))
     .select(
         'type', 
         'year', 
         'month', 
         'VendorID', 
         col('tpep_pickup_datetime').alias('pickup_datetime'), 
         'passenger_count', 
         'trip_distance', 
         'PULocationID', 
         'DOLocationID', 
         'fare_amount',
         'extra',
         'mta_tax',
         'tip_amount',
         'tolls_amount',
         'improvement_surcharge',
         'total_amount',
         'payment_type',
         'congestion_surcharge')
)

tbl_green_taxi_prime = (tbl_green_taxi
     .withColumn("type", lit("green"))
     .withColumn("year", F.year('lpep_pickup_datetime'))
     .withColumn("month", F.month('lpep_pickup_datetime'))
     .select(
         'type', 
         'year', 
         'month', 
         'VendorID', 
         col('lpep_pickup_datetime').alias('pickup_datetime'), 
         'passenger_count', 
         'trip_distance', 
         'PULocationID', 
         'DOLocationID', 
         'fare_amount',
         'extra',
         'mta_tax',
         'tip_amount',
         'tolls_amount',
         'improvement_surcharge',
         'total_amount',
         'payment_type',
         'congestion_surcharge')
)

tbl_taxi_prime_union = tbl_yellow_taxi_prime.union(tbl_green_taxi_prime)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Primero vamos a guardar el archivo como un solo CSV. Para eso usamos coalesce, para decirle a Spark que queremos juntar todas las particiones que están en los nodos en una sola partición. Ojo esta operación es costosa en computo.

Por ejemplo, el DataFrame original que cargamos tiene el siguiente número de particiones.

In [36]:
tbl_taxi_prime_union.rdd.getNumPartitions

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<bound method RDD.getNumPartitions of MapPartitionsRDD[121] at javaToPython at NativeMethodAccessorImpl.java:0>

Ahora reduzcamoslo a 1 partición.

In [None]:
(tbl_taxi_prime_union
    .filter(col('year') == 2022)
    .coalesce(1)
     .write
     .mode('overwrite')
     .csv("s3://itam-analytics-MINOMBRE/taxi_prep_csv")
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Segundo vamos a guardar el archivo particionado por type, year y month como parquet.

In [33]:
(tbl_taxi_prime_union
     .filter(col('year') == 2022)
     .write
     .partitionBy('type', 'year', 'month')
     .mode('overwrite')
     .parquet("s3://itam-analytics-MINOMBRE/taxi_prep_parquet")
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Observa que el path indica que estamos creando carpetas diferentes para que cuando queramos consultar con Athena, cada carpeta sea una tabla diferente. La idea es que observes la diferencia en consultar ambas tablas en Athena, para que 
veas el desempeño en tiempo y costo.