# Demo Spark en Cluster

Este demo utiliza los siguientes datos [New York City Taxi and Limousine Commission (TLC) Trip Record Data](https://aws.amazon.com/marketplace/pp/prodview-okyonroqg5b2u?sr=0-1&ref_=beagle&applicationId=AWSMPContessa#usage)

In [1]:
%%html
<style>
div.output_area pre {
    white-space: pre;
}
</style>

In [24]:
# Importar librerías
from pyspark.sql import SparkSession

# Importar funciones únicas
from pyspark.sql.functions import lit, col
from pyspark.sql.types import IntegerType,BooleanType,DateType

# Importar funciones con un alias, que pudieran 
# tener conflicto con las librerías base de Python, 
# como sum, avg, round, ...
import pyspark.sql.functions as F

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Recuerda que vamos a trabajar con spark dataframes, por lo tanto tienes que abrir un SparkSession

In [3]:
spark = SparkSession.builder.appName('Productos electronicos').getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Hay que cargar los datasets, nota que podemos utilizar wildcards para leer varios parquets al mismo tiempo.

In [6]:
prueba = spark.read.option("header", False).csv("s3://itam-analytics-abcd/2022")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
prueba.show(10, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+---------------------------------------------+--------------+----------------------------------+------------+------+----------+------------+-------------------------------------+------------------------------------+---------------------------------------------------------------------------------+--------------+--------------+---------+-----------+
|_c0            |_c1                                          |_c2           |_c3                               |_c4         |_c5   |_c6       |_c7         |_c8                                  |_c9                                 |_c10                                                                             |_c11          |_c12          |_c13     |_c14       |
+---------------+---------------------------------------------+--------------+----------------------------------+------------+------+----------+------------+-------------------------------------+------------------------------------+----------------------------------

In [8]:
prueba.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

14449481

In [9]:
prueba = prueba.withColumnRenamed("_c0", "producto")\
       .withColumnRenamed("_c1", "presentacion")\
       .withColumnRenamed("_c2", "marca")\
       .withColumnRenamed("_c3", "categoria")\
       .withColumnRenamed("_c4", "catalogo")\
       .withColumnRenamed("_c5", "precio")\
       .withColumnRenamed("_c6", "fechaRegistro")\
       .withColumnRenamed("_c7", "cadenaComercial")\
       .withColumnRenamed("_c8", "giro")\
       .withColumnRenamed("_c9", "nombreComercial")\
       .withColumnRenamed("_c10", "direccion")\
       .withColumnRenamed("_c11", "estado")\
       .withColumnRenamed("_c12", "municipio")\
       .withColumnRenamed("_c13", "latitud")\
       .withColumnRenamed("_c14", "longitud")\

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
prueba.show(5,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+---------------------------------------------+-------+---------------------------------+------------+------+-------------+---------------+-------------------------------------+------------------------------------+---------------------------------------------------------------------------------+--------------+--------------+---------+-----------+
|producto|presentacion                                 |marca  |categoria                        |catalogo    |precio|fechaRegistro|cadenaComercial|giro                                 |nombreComercial                     |direccion                                                                        |estado        |municipio     |latitud  |longitud   |
+--------+---------------------------------------------+-------+---------------------------------+------------+------+-------------+---------------+-------------------------------------+------------------------------------+-------------------------------------------------------------

In [31]:
prueba = prueba.withColumn("precio",col("precio").cast(IntegerType()))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
prueba.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- producto: string (nullable = true)
 |-- presentacion: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- catalogo: string (nullable = true)
 |-- precio: integer (nullable = true)
 |-- fechaRegistro: string (nullable = true)
 |-- cadenaComercial: string (nullable = true)
 |-- giro: string (nullable = true)
 |-- nombreComercial: string (nullable = true)
 |-- direccion: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- latitud: string (nullable = true)
 |-- longitud: string (nullable = true)

In [42]:
precios = prueba.select("precio")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [50]:
precios = precios.sort(col("precio").desc())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+
|precio|
+------+
|94248 |
|92856 |
|92856 |
|92856 |
|92856 |
|84181 |
|84181 |
|77141 |
|77141 |
|77141 |
+------+
only showing top 10 rows

In [13]:
(prueba
     .write
     .partitionBy('catalogo')
     .mode('overwrite')
     .parquet("s3://itam-analytics-abcd/preprocessing/unified_parquet")
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Creemos un resumen global de la distancia promedio, el costo por milla promedio y el costo promedio.

In [10]:
(
    tbl_union
        .select(
            F.round(F.avg('trip_distance'),2).alias('avg_dist'),
            F.round(F.avg(col('total_amount')/col('trip_distance')),2).alias('avg_cost_per_mile'),
            F.round(F.avg('total_amount'),2).alias('avg_cost'))
        
).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----------------+--------+
|avg_dist|avg_cost_per_mile|avg_cost|
+--------+-----------------+--------+
|    7.59|            12.71|   21.62|
+--------+-----------------+--------+

Repitamos la misma operación pero ahora agrupando por tipo de taxi.

In [11]:
(tbl_union
     .groupBy('type')
     .agg(
         F.round(F.avg('trip_distance'),2).alias('avg_dist'),
         F.round(F.avg(col('total_amount')/col('trip_distance')),2).alias('avg_cost_per_mile'),
         F.round(F.avg('total_amount'),2).alias('avg_cost')) 
).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+--------+-----------------+--------+
|  type|avg_dist|avg_cost_per_mile|avg_cost|
+------+--------+-----------------+--------+
|yellow|    5.96|            12.68|   21.67|
| green|   84.45|            14.51|   19.32|
+------+--------+-----------------+--------+

Ahora hagamos algo nuevo, imagina que estos parquets queremos convertirlos en un directorio particionado para poder consultarlo en Athena de manera eficiente.

Los datos los vamos a consultar por tipo, año y mes. Eso quiere decir que hay que guardar este dataframe particionado por estas variables.

In [12]:
tbl_yellow_taxi_prime = (
    tbl_yellow_taxi
     .withColumn("type", lit("yellow"))
     .withColumn("year", F.year('tpep_pickup_datetime'))
     .withColumn("month", F.month('tpep_pickup_datetime'))
     .select(
         'type', 
         'year', 
         'month', 
         'VendorID', 
         col('tpep_pickup_datetime').alias('pickup_datetime'), 
         'passenger_count', 
         'trip_distance', 
         'PULocationID', 
         'DOLocationID', 
         'fare_amount',
         'extra',
         'mta_tax',
         'tip_amount',
         'tolls_amount',
         'improvement_surcharge',
         'total_amount',
         'payment_type',
         'congestion_surcharge')
)

tbl_green_taxi_prime = (tbl_green_taxi
     .withColumn("type", lit("green"))
     .withColumn("year", F.year('lpep_pickup_datetime'))
     .withColumn("month", F.month('lpep_pickup_datetime'))
     .select(
         'type', 
         'year', 
         'month', 
         'VendorID', 
         col('lpep_pickup_datetime').alias('pickup_datetime'), 
         'passenger_count', 
         'trip_distance', 
         'PULocationID', 
         'DOLocationID', 
         'fare_amount',
         'extra',
         'mta_tax',
         'tip_amount',
         'tolls_amount',
         'improvement_surcharge',
         'total_amount',
         'payment_type',
         'congestion_surcharge')
)

tbl_taxi_prime_union = tbl_yellow_taxi_prime.union(tbl_green_taxi_prime)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Primero vamos a guardar el archivo como un solo CSV. Para eso usamos coalesce, para decirle a Spark que queremos juntar todas las particiones que están en los nodos en una sola partición. Ojo esta operación es costosa en computo.

Por ejemplo, el DataFrame original que cargamos tiene el siguiente número de particiones.

In [13]:
tbl_taxi_prime_union.rdd.getNumPartitions

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<bound method RDD.getNumPartitions of MapPartitionsRDD[59] at javaToPython at NativeMethodAccessorImpl.java:0>

Ahora reduzcamoslo a 1 partición.

In [15]:
(tbl_taxi_prime_union
    .filter(col('year') == 2022)
    .coalesce(1)
     .write
     .mode('overwrite')
     .csv("s3://itam-analytics-brandon/taxi_prep_csv")
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Segundo vamos a guardar el archivo particionado por type, year y month como parquet.

In [17]:
(tbl_taxi_prime_union
     .filter(col('year') == 2022)
     .write
     .partitionBy('type', 'year', 'month')
     .mode('overwrite')
     .parquet("s3://itam-analytics-brandon/taxi_prep_parquet")
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Observa que el path indica que estamos creando carpetas diferentes para que cuando queramos consultar con Athena, cada carpeta sea una tabla diferente. La idea es que observes la diferencia en consultar ambas tablas en Athena, para que 
veas el desempeño en tiempo y costo.