In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [2]:
spark = SparkSession.builder \
    .appName("SmallTestDataset") \
    .master("local[*]") \
    .getOrCreate()

In [3]:
#INPUT DATA
input_data_g = "/home/jupyter/proyect/ds_taxi_NY/green_tripdata_2024-01.parquet"
input_data_y = "/home/jupyter/proyect/ds_taxi_NY/yellow_tripdata_2024-01.parquet"

In [4]:
#FUNCIONES
def sumar_missing_per_var(x):
    """conteo de missing en todas las variables del dataset"""
    
    return x.select([F.count( F.when(F.col(c).isNull(),c)  ).alias(c) for c in x.columns]).show()

In [5]:
#READ FILE GREEN
df_g = spark.read.parquet(input_data_g)

#SELECCION VARIABLES GREEN DATASET
lista_vars = ['VendorID',
 'lpep_pickup_datetime',
 'lpep_dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'tip_amount',
 'total_amount']

dfg = df_g.select(lista_vars)


# FEATURE duracion_s: duracion del viaje en segundos
dfg = dfg.withColumn('duracion_s', F.unix_timestamp("lpep_dropoff_datetime") - F.unix_timestamp('lpep_pickup_datetime') )\
         .withColumn('duracion_s', F.when(  F.col('duracion_s')<0 , F.col('duracion_s')*-1 ).otherwise(F.col('duracion_s')) )

# TRANSFORMACION REDONDEO TIEMPO, Y DATE GREEN TAXI
dfg = dfg.withColumn("date_init_trip", F.date_format(F.col("lpep_pickup_datetime"), "yyyy-MM-dd"))\
         .withColumn( 'hour_init_trip',     F.hour( F.col("lpep_pickup_datetime") ))\
         .drop(F.col('lpep_pickup_datetime'))\
         .drop( F.col('lpep_dropoff_datetime') )
         

# FEATURE tipoVehiculo: tipo vehiculo
dfg = dfg.withColumn('tipoVehiculo', F.lit('Green'))

In [6]:
dfg.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- duracion_s: long (nullable = true)
 |-- date_init_trip: string (nullable = true)
 |-- hour_init_trip: integer (nullable = true)
 |-- tipoVehiculo: string (nullable = false)



In [7]:
#READ YELLOW
df_y = spark.read.parquet(input_data_y)

#SELECCION VARIABLES YELLOW DATASET
lista_vars = ['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'tip_amount',
 'total_amount']

dfy = df_y.select(lista_vars)

# FEATURE duracion_s: duracion del viaje en segundos
dfy = dfy.withColumn('duracion_s', F.unix_timestamp("tpep_dropoff_datetime") - F.unix_timestamp('tpep_pickup_datetime') )\
         .withColumn('duracion_s', F.when(  F.col('duracion_s')<0 , F.col('duracion_s')*-1 ).otherwise(F.col('duracion_s')) )

# TRANSFORMACION REDONDEO TIEMPO, Y DATE YELLOW TAXI
dfy = dfy.withColumn("date_init_trip", F.date_format(F.col("tpep_pickup_datetime"), "yyyy-MM-dd"))\
         .withColumn( 'hour_init_trip',     F.hour( F.col("tpep_dropoff_datetime") ))\
         .drop(F.col('tpep_pickup_datetime'))\
         .drop( F.col('tpep_dropoff_datetime') )\
         .drop( "date_arrival_trip","hour_arrival_trip"  )

# FEATURE tipoVehiculo: tipo vehiculo
dfy = dfy.withColumn('tipoVehiculo', F.lit('Yellow'))

In [8]:
dfy.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- duracion_s: long (nullable = true)
 |-- date_init_trip: string (nullable = true)
 |-- hour_init_trip: integer (nullable = true)
 |-- tipoVehiculo: string (nullable = false)



In [9]:
dfy.columns == dfg.columns

True

In [10]:
# Union data sets
df = dfg.union(dfy)

In [11]:
df.columns

['VendorID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'tip_amount',
 'total_amount',
 'duracion_s',
 'date_init_trip',
 'hour_init_trip',
 'tipoVehiculo']

In [14]:
# Orden columnas
df = df.select(['VendorID',
                'tipoVehiculo',
                'PULocationID',
                'DOLocationID',
                'date_init_trip',
                'hour_init_trip',
                'duracion_s',
                'passenger_count',
                'trip_distance',
                'tip_amount',
                'total_amount' ])

# Incluir propinas en el precio total
df = df.withColumn('total_amount', F.round(F.col('total_amount')+F.col('tip_amount'),2)  )

In [15]:
df.show()

+--------+------------+------------+------------+--------------+--------------+----------+---------------+-------------+----------+------------+
|VendorID|tipoVehiculo|PULocationID|DOLocationID|date_init_trip|hour_init_trip|duracion_s|passenger_count|trip_distance|tip_amount|total_amount|
+--------+------------+------------+------------+--------------+--------------+----------+---------------+-------------+----------+------------+
|       2|       Green|         236|         239|    2024-01-01|             0|       690|              1|         1.98|      3.61|       28.88|
|       2|       Green|          65|         170|    2024-01-01|             0|      1252|              5|         6.54|      7.11|       56.88|
|       2|       Green|          74|         262|    2024-01-01|             0|      1142|              1|         3.08|       3.0|       34.05|
|       1|       Green|          74|         116|    2024-01-01|             0|       712|              1|          2.4|       0.0

In [None]:
# CREACION DATAMARTS

df_day=