In [11]:
import polars as pl
pl.Config.set_tbl_rows(15) #Esta config controla la cantidad de filas en print y display en Jupyter
pl.Config.set_tbl_cols(50) #Cantidad de columnas a desplegar
pl.Config.set_fmt_str_lengths(100) #Longitud de las cadenas a desplegar
pl.toggle_string_cache(True)

`toggle_string_cache` es neecsario para poder operar eficientemente con variables de tipo `categorical`

## Exploración y limpieza de datos

In [12]:
taxi_sample = pl.read_parquet('../output/taxi.parquet', n_rows=1000000)
taxi_sample.head()

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str
"""1""","""11/13/2018 07:45:26 AM""","""11/13/2018 07:57:39 AM""","""1""","""1.5""","""1""","""N""","""246""","""164""","""1""","""9.5""","""0""","""0.5""","""1""","""0""","""0.3""","""11.3"""
"""2""","""11/13/2018 07:24:47 AM""","""11/13/2018 07:29:45 AM""","""1""","""0.88""","""1""","""N""","""140""","""263""","""1""","""5.5""","""0""","""0.5""","""2""","""0""","""0.3""","""8.3"""
"""2""","""11/13/2018 07:31:34 AM""","""11/13/2018 07:36:04 AM""","""1""","""0.54""","""1""","""N""","""236""","""236""","""2""","""5""","""0""","""0.5""","""0""","""0""","""0.3""","""5.8"""
"""2""","""11/13/2018 07:36:59 AM""","""11/13/2018 07:51:07 AM""","""1""","""2.43""","""1""","""N""","""236""","""48""","""1""","""11.5""","""0""","""0.5""","""2.46""","""0""","""0.3""","""14.76"""
"""2""","""11/13/2018 07:53:40 AM""","""11/13/2018 08:21:53 AM""","""1""","""2.96""","""1""","""N""","""163""","""43""","""1""","""18""","""0""","""0.5""","""0""","""0""","""0.3""","""18.8"""


Definimos todas las funciones que formarán parte de nuestro pipeline de limpieza de datos. Tomar en cuenta que estaremos trabajando con `LazyFrames` debido al alto volumen de datos. 

In [13]:
def drop_columns(dataframe: pl.LazyFrame) -> pl.LazyFrame:
    '''Función que excluye columnas específicas del dataset por simplicidad.'''
    dataframe = dataframe.select(
        pl.exclude('RatecodeID','store_and_fwd_flag','fare_amount','extra','mta_tax','tolls_amount','improvement_surcharge')
    )
    return dataframe

In [14]:
def rename_columns(dataframe: pl.LazyFrame) -> pl.LazyFrame:
    '''Función que estandariza el nombre de las columnas del dataset de taxis de NY'''
    dataframe = dataframe.rename({
        'VendorID' : 'vendor_id',
        'tpep_pickup_datetime' : 'pickup_datetime',
        'tpep_dropoff_datetime' : 'dropoff_datetime',
        'passenger_count' : 'passenger_count',
        'trip_distance' : 'trip_distance',
        'PULocationID' : 'pickup_location_id',
        'DOLocationID' : 'dropoff_location_id',
        'payment_type' : 'payment_type_id',
        'tip_amount' : 'tip_amount',
        'total_amount' : 'total_amount'
    })
    return dataframe

Cuando hacemos `cast` o definimos variables de tipo `categorical` es importante que usemos `set_ordering` con el valor de *lexical*, de lo contrario cuando hagamos `groupby` y/o `sort` no tendremos el ordenamiento esperado:

In [15]:
def cast_column_types(dataframe: pl.LazyFrame) -> pl.LazyFrame:
    '''Función que hace cast al tipo adecuado de datos. Hace uso de expresiones regulares.'''
    dataframe = dataframe.with_columns(
        pl.col("^.*amount$|^.*distance$").cast(pl.Float64),
        pl.col("^.*count$").cast(pl.Int32),
        pl.col("^.*datetime$").str.strptime(pl.Datetime, "%m/%d/%Y %I:%M:%S %p",strict=False),
        pl.col("^.*id$").cast(pl.Categorical).cat.set_ordering("lexical")
    )     
    return dataframe

En ocasiones queremos usar el resultado de alguna `expression` como parámetro a otra. Si el resultado es un único valor podemos usar `item` para usar dicho valor como entrada en otra expression:

In [16]:
def fix_bad_values(dataframe: pl.LazyFrame) -> pl.LazyFrame:
    '''Hay múltiples columnas con valores malformados. Esta función corrige tanto fechas como números'''
    quantile_tip_amount = dataframe.select(pl.col('tip_amount').quantile(0.9999)).collect().item()
    quantile_total_amount = dataframe.select(pl.col('total_amount').quantile(0.9999)).collect().item()
    dataframe = dataframe.with_columns(
        
        pl.col('trip_distance').clip(0,80).keep_name(),
        pl.col('passenger_count').clip(0,9).keep_name(),
        pl.col('tip_amount').clip(0, quantile_tip_amount).keep_name(),
        pl.col('total_amount').clip(0, quantile_total_amount).keep_name(),
        pl.when(pl.col("^.*datetime$").dt.year() != 2018)
            .then(pl.col("^.*datetime$").dt.strftime('2018/%m/%d %H:%M:%S').str.strptime(pl.Datetime, '%Y/%m/%d %H:%M:%S'))
            .otherwise(pl.col("^.*datetime$"))
            .keep_name(),            
    )
    return dataframe
         

En ocasiones necesitamos anidar `with_columns` debido a que queremos construir campos calculados sobre otros campos calculados:

In [17]:
def calculate_columns(dataframe: pl.LazyFrame) -> pl.LazyFrame:
    '''Función que calcula la duración y velocidad de los viajes'''
    dataframe = dataframe.with_columns(
        trip_duration = (pl.col("dropoff_datetime") - pl.col("pickup_datetime")).dt.minutes().clip(0,180),        
    ) \
    .with_columns(
        trip_speed = pl.when((pl.col('trip_duration') > 1) & (pl.col('trip_distance') > 0.1))
          .then((pl.col('trip_distance') * pl.lit(1.60934) * pl.lit(60)) / pl.col('trip_duration')) # Conversión km/h
          .otherwise(None)
    )
    return dataframe

Procedemos a ejecutar el pipeline de limpieza en una muestra para validar el funcionamiento. Como el pipeline espera un `LazyFrame` mediante `lazy` convertimos el `DataFrame` original:

In [18]:
taxi_sample_clean = (taxi_sample.lazy()
            .pipe(drop_columns)
            .pipe(rename_columns)
            .pipe(cast_column_types)
            .pipe(fix_bad_values)
            .pipe(calculate_columns)         
)
taxi_sample_clean.collect().head()

vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_location_id,dropoff_location_id,payment_type_id,tip_amount,total_amount,trip_duration,trip_speed
cat,datetime[μs],datetime[μs],i32,f64,cat,cat,cat,f64,f64,i64,f64
"""1""",2018-11-13 07:45:26,2018-11-13 07:57:39,1,1.5,"""246""","""164""","""1""",1.0,11.3,12,12.07005
"""2""",2018-11-13 07:24:47,2018-11-13 07:29:45,1,0.88,"""140""","""263""","""1""",2.0,8.3,4,21.243288
"""2""",2018-11-13 07:31:34,2018-11-13 07:36:04,1,0.54,"""236""","""236""","""2""",0.0,5.8,4,13.035654
"""2""",2018-11-13 07:36:59,2018-11-13 07:51:07,1,2.43,"""236""","""48""","""1""",2.46,14.76,14,16.760127
"""2""",2018-11-13 07:53:40,2018-11-13 08:21:53,1,2.96,"""163""","""43""","""1""",0.0,18.8,28,10.207814


In [19]:
taxi_sample_clean.collect().describe()

describe,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_location_id,dropoff_location_id,payment_type_id,tip_amount,total_amount,trip_duration,trip_speed
str,str,str,str,f64,f64,str,str,str,f64,f64,f64,f64
"""count""","""1000000""","""1000000""","""1000000""",1000000.0,1000000.0,"""1000000""","""1000000""","""1000000""",1000000.0,1000000.0,1000000.0,1000000.0
"""null_count""","""0""","""0""","""0""",0.0,0.0,"""0""","""0""","""0""",0.0,0.0,0.0,23154.0
"""mean""",,,,1.535628,2.899356,,,,2.137468,17.795293,17.036333,16.39244
"""std""",,,,1.209576,3.88934,,,,2.940436,16.233516,18.056075,10.70029
"""min""",,"""2018-01-01 00:05:38.000000""","""2018-01-01 04:50:14.000000""",0.0,0.0,,,,0.0,0.0,0.0,0.059009
"""max""",,"""2018-12-31 23:03:08.000000""","""2018-12-31 19:03:26.000000""",9.0,80.0,,,,62.27,287.27,180.0,2574.944
"""median""",,,,1.0,1.51,,,,1.55,12.74,12.0,14.112674


Procedemos a ejecutar el pipeline completo en todos los datos. Es importante el uso de `streaming=True` por si tenemos limitante de memoria RAM. De esta forma es posible procesar todo el dataset:

In [20]:
taxi_clean = pl.scan_parquet('../output/taxi.parquet') \
            .pipe(drop_columns) \
            .pipe(rename_columns) \
            .pipe(cast_column_types) \
            .pipe(fix_bad_values) \
            .pipe(calculate_columns)

In [21]:
taxi_clean = taxi_clean.collect(streaming=True)

In [22]:
taxi_clean.describe()

describe,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_location_id,dropoff_location_id,payment_type_id,tip_amount,total_amount,trip_duration,trip_speed
str,str,str,str,f64,f64,str,str,str,f64,f64,f64,f64
"""count""","""112234626""","""112234626""","""112234626""",112234626.0,112234626.0,"""112234626""","""112234626""","""112234626""",112234626.0,112234626.0,112234626.0,112234626.0
"""null_count""","""0""","""0""","""0""",0.0,0.0,"""0""","""0""","""0""",0.0,0.0,0.0,2694977.0
"""mean""",,,,1.595513,2.926978,,,,1.86961,16.322072,14.184362,19.134251
"""std""",,,,1.241607,3.777462,,,,2.503802,14.151166,14.133129,10.547828
"""min""",,"""2018-01-01 00:00:00.000000""","""2018-01-01 00:00:00.000000""",0.0,0.0,,,,0.0,0.0,0.0,0.059009
"""max""",,"""2018-12-31 23:59:59.000000""","""2018-12-31 23:59:59.000000""",9.0,80.0,,,,45.5,232.86,180.0,3862.416
"""median""",,,,1.0,1.6,,,,1.4,11.8,11.0,16.89807


Validamos que algunos campos específicos estén limpios:

In [23]:
taxi_clean.filter(pl.col('trip_duration')<0).height

0

In [24]:
taxi_clean.filter((pl.col('dropoff_datetime').dt.year()>2023) | (pl.col('dropoff_datetime').dt.year()<2017))

vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_location_id,dropoff_location_id,payment_type_id,tip_amount,total_amount,trip_duration,trip_speed
cat,datetime[μs],datetime[μs],i32,f64,cat,cat,cat,f64,f64,i64,f64


In [25]:
taxi_clean.select(pl.all().null_count())

vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_location_id,dropoff_location_id,payment_type_id,tip_amount,total_amount,trip_duration,trip_speed
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
0,0,0,0,0,0,0,0,0,0,0,2694977


In [26]:
taxi_clean.select(pl.col('passenger_count').value_counts()).unnest('passenger_count').sort('passenger_count')

passenger_count,counts
i32,u32
0,1003298
1,79786664
2,16468127
3,4684094
4,2209579
5,5040905
6,3040893
7,425
8,349
9,292


In [27]:
taxi_clean.select(pl.col('dropoff_datetime').dt.year().value_counts()).unnest('dropoff_datetime')

dropoff_datetime,counts
i32,u32
2018,112234626


## Analítica
### Group By y agregaciones

![Group By](../img/groupby.png)

`Groupby` en conjunto con `agg` nos permite realizar agregaciones por grupo.

In [28]:
(taxi_clean.lazy()
    .groupby('payment_type_id')
    .agg(
        pl.col('total_amount').std().suffix('_std'),
        pl.col('total_amount').var().suffix('_var'),
        pl.col('total_amount').median().suffix('_median'),
        pl.col('total_amount').mean().suffix('_mean'),
        pl.col('total_amount').sum().suffix('_sum'),
        pl.col('total_amount').max().suffix('_max'),
        pl.col('total_amount').min().suffix('_min'),
        pl.col('total_amount').count().suffix('_count'),
        pl.col('total_amount').n_unique().suffix('_unique'),
        
    ).sort('payment_type_id')
    .collect()
   )

payment_type_id,total_amount_std,total_amount_var,total_amount_median,total_amount_mean,total_amount_sum,total_amount_max,total_amount_min,total_amount_count,total_amount_unique
cat,f64,f64,f64,f64,f64,f64,f64,u32,u32
"""1""",14.957931,223.739705,12.85,17.652092,1375600000.0,232.86,0.0,77928307,17618
"""2""",11.421088,130.44124,9.8,13.296029,446170000.0,232.86,0.0,33556849,7258
"""3""",16.874474,284.747869,8.3,13.49828,7864100.0,232.86,0.0,582599,2237
"""4""",16.572163,274.636586,8.8,13.5855,2267000.0,232.86,0.0,166868,1215
"""5""",25.727933,661.926533,35.8,32.513333,97.54,56.44,5.3,3,3


`groupby_dynamic` nos permite crear intervalos en campos de `Date` o `Datetime` y en conjunto con `agg` realizar agregaciones:

In [29]:
taxi_daily_stats = (taxi_clean.lazy()
    .sort('pickup_datetime')
    .groupby_dynamic(
        'pickup_datetime', 
        every='1d',
        by='vendor_id')
    .agg(
        pl.count().alias('trips_count'),        
        pl.col(pl.NUMERIC_DTYPES).sum().suffix('_sum'),
        pl.col(pl.NUMERIC_DTYPES).mean().suffix('_mean'),
        pl.col(pl.NUMERIC_DTYPES).median().suffix('_median'),              
    )
)

También es posible hacer acumulaciones mediante la familia de funciones `cum` como `cumsum`:

In [30]:
taxi_daily_cum_stats = taxi_daily_stats.select(
    'vendor_id',
    pl.col('pickup_datetime').cast(pl.Date),
    'trips_count',
    pl.col(r'^.*sum$'),
    pl.col(r'^.*sum$').cumsum().suffix('_cum'),
).collect()

In [31]:
taxi_daily_cum_stats

vendor_id,pickup_datetime,trips_count,passenger_count_sum,trip_distance_sum,tip_amount_sum,total_amount_sum,trip_duration_sum,trip_speed_sum,passenger_count_sum_cum,trip_distance_sum_cum,tip_amount_sum_cum,total_amount_sum_cum,trip_duration_sum_cum,trip_speed_sum_cum
cat,date,u32,i32,f64,f64,f64,i64,f64,i32,f64,f64,f64,i64,f64
"""2""",2018-01-01,141704,270942,488214.54,239074.95,2.3418e6,1756888,3.4909e6,270942,488214.54,239074.95,2.3418e6,1756888,3.4909e6
"""2""",2018-01-02,134592,257630,424210.71,231022.7,2.1644e6,1696152,2.8857e6,528572,912425.25,470097.65,4.5061e6,3453040,6.3766e6
"""2""",2018-01-03,148673,282687,430239.87,256162.3,2.3414e6,1989922,2.8654e6,811259,1.3427e6,726259.95,6.8475e6,5442962,9.2420e6
"""2""",2018-01-04,73429,139186,192898.61,114495.16,1.0727e6,938488,1.3696e6,950445,1.5356e6,840755.11,7.9202e6,6381450,1.0612e7
"""2""",2018-01-05,152386,291141,405175.7,263764.36,2.3533e6,2136171,2.5978e6,1241586,1.9407e6,1.1045e6,1.0273e7,8517621,1.3209e7
"""2""",2018-01-06,160667,309548,457776.51,267176.2,2.4333e6,2031684,3.1863e6,1551134,2.3985e6,1.3717e6,1.2707e7,10549305,1.6396e7
"""2""",2018-01-07,135496,262067,420228.73,240716.98,2.1244e6,1617140,3.0285e6,1813201,2.8187e6,1.6124e6,1.4831e7,12166445,1.9424e7
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
"""4""",2018-12-25,1511,1586,5009.36,2361.26,24090.42,17329,36024.474564,179056396,3.2847e8,2.0981e8,1.8317e9,1591787895,2.0957e9
"""4""",2018-12-26,2343,2491,7065.0,3909.92,38405.93,30748,47332.921475,179058887,3.2847e8,2.0982e8,1.8317e9,1591818643,2.0957e9


Analicemos el plan que se ejecutó:

In [32]:
taxi_daily_stats.select(
    'vendor_id',
    pl.col('pickup_datetime').cast(pl.Date),
    'trips_count',
    pl.col(r'^.*sum$'),
    pl.col(r'^.*sum$').cumsum().suffix('_cum'),
).explain()

' SELECT [col("vendor_id"), col("pickup_datetime").strict_cast(Date), col("trips_count"), col("passenger_count_sum"), col("trip_distance_sum"), col("tip_amount_sum"), col("total_amount_sum"), col("trip_duration_sum"), col("trip_speed_sum"), col("passenger_count_sum").cumsum().alias("passenger_count_sum_cum"), col("trip_distance_sum").cumsum().alias("trip_distance_sum_cum"), col("tip_amount_sum").cumsum().alias("tip_amount_sum_cum"), col("total_amount_sum").cumsum().alias("total_amount_sum_cum"), col("trip_duration_sum").cumsum().alias("trip_duration_sum_cum"), col("trip_speed_sum").cumsum().alias("trip_speed_sum_cum")] FROM\n  AGGREGATE\n  \t[count().alias("trips_count"), col("passenger_count").sum().alias("passenger_count_sum"), col("trip_distance").sum().alias("trip_distance_sum"), col("tip_amount").sum().alias("tip_amount_sum"), col("total_amount").sum().alias("total_amount_sum"), col("trip_duration").sum().alias("trip_duration_sum"), col("trip_speed").sum().alias("trip_speed_sum")]

### Window functions
También podemos ejecutar window functions mediante `over`. Hay que tener mucho cuidado ya que las window functions regresan una fila por cada fila original en la entrada de la función: <br><br>
![Polars Window functions](../img/window.png)

In [33]:
window = pl.col('pickup_datetime').cast(pl.Date).alias('pickup_date')
taxi_window_stats = taxi_clean.lazy()\
    .filter(
        (pl.col('pickup_datetime').dt.month()==12) 
    )\
    .select(
        window,
        (pl.col(pl.NUMERIC_DTYPES) *100 / pl.col(pl.NUMERIC_DTYPES).max().over(window)).suffix('_max_pct')
    )\
    .sort('pickup_date').collect()

In [34]:
taxi_window_stats

pickup_date,passenger_count_max_pct,trip_distance_max_pct,tip_amount_max_pct,total_amount_max_pct,trip_duration_max_pct,trip_speed_max_pct
date,f64,f64,f64,f64,f64,f64
2018-12-01,42.857143,1.1625,4.285714,4.187065,3.333333,2.183099
2018-12-01,14.285714,4.3625,6.065934,7.111569,5.555556,4.915493
2018-12-01,14.285714,0.5,0.0,2.490767,1.666667,1.877934
2018-12-01,28.571429,0.7125,2.197802,3.134931,2.222222,2.007042
2018-12-01,71.428571,4.125,0.0,6.570472,8.333333,3.098592
2018-12-01,85.714286,1.8625,4.307692,5.050245,6.111111,1.90781
2018-12-01,28.571429,3.8625,7.582418,7.407885,8.333333,2.901408
...,...,...,...,...,...,...
2018-12-31,42.857143,1.65,0.0,3.564373,5.0,1.275362
2018-12-31,14.285714,3.5875,0.0,5.282144,5.555556,2.495652


Una alternativa a fuciones comunes de Window en SQL es usar directamente métodos que nos da Polars como `pct_change`, `rank`, `top_k`. En ocasiones debemos usarlas en conjunto con `groupby`:

In [35]:
(taxi_daily_cum_stats.lazy()
    .select(
        pl.col('pickup_datetime').cast(pl.Date),
        pl.col('total_amount_sum_cum').pct_change().alias('total_amount_daily_pct_change'),
        pl.col('total_amount_sum').rank(method='ordinal').alias('total_amount_sum_rank')
    )
).collect()


pickup_datetime,total_amount_daily_pct_change,total_amount_sum_rank
date,f64,u32
2018-01-01,,617
2018-01-02,0.924241,546
2018-01-03,0.519592,616
2018-01-04,0.156659,247
2018-01-05,0.297124,619
2018-01-06,0.236852,632
2018-01-07,0.167183,525
...,...,...
2018-12-25,0.000013,124
2018-12-26,0.000021,136


### Pivot tables

`Pivot` nos permite agregar de manera similar a cómo lo hacen las hojas de cálculo:

In [36]:
(taxi_clean
 .select('total_amount',
    pl.col('pickup_datetime').cast(pl.Date),
    'payment_type_id',
).sort('pickup_datetime')
.pivot(values='total_amount', index='pickup_datetime', aggregate_function='sum', columns='payment_type_id')
.fill_null(0)
)

pickup_datetime,1,2,4,3,5
date,f64,f64,f64,f64,f64
2018-01-01,2.5463e6,1.2894e6,6613.85,21421.6,0.0
2018-01-02,2.5680e6,1.1413e6,4493.48,18189.38,0.0
2018-01-03,2.8910e6,1.1994e6,5551.6,19222.34,0.0
2018-01-04,1.1776e6,570874.96,2359.89,8071.63,0.0
2018-01-05,2.9262e6,1.0938e6,4433.03,17224.93,0.0
2018-01-06,2.9765e6,1.1744e6,5268.63,19602.58,0.0
2018-01-07,2.6601e6,951547.710001,5778.22,16715.7,0.0
...,...,...,...,...,...
2018-12-25,1.2053e6,748053.26,2722.0,7700.65,0.0
2018-12-26,1.9707e6,1.0382e6,4818.51,11042.54,0.0


## Joins y reestructuración de datos

In [37]:
catalog_payment_type = pl.LazyFrame (
    {
        "payment_type_id" : ["1", "2", "3", "4", "5", "6"],
        "payment_type" : ["Credit card", "Cash", "No charge", "Dispute", "Unknown", "Voided trip"],    
    },
    schema= 
    {
        "payment_type_id" :pl.Categorical,
        "payment_type" : pl.Utf8
    } 
    ).collect()
catalog_payment_type

payment_type_id,payment_type
cat,str
"""1""","""Credit card"""
"""2""","""Cash"""
"""3""","""No charge"""
"""4""","""Dispute"""
"""5""","""Unknown"""
"""6""","""Voided trip"""


In [42]:
taxi_pivot = (taxi_clean
 .join(catalog_payment_type, on='payment_type_id', how='left')
 .select('total_amount',
    pl.col('pickup_datetime').cast(pl.Date),
    'payment_type',
).sort('pickup_datetime')
.pivot(values='total_amount', index='pickup_datetime', aggregate_function='sum', columns='payment_type')
.fill_null(0)
)
taxi_pivot

pickup_datetime,Credit card,Cash,Dispute,No charge,Unknown
date,f64,f64,f64,f64,f64
2018-01-01,2.5463e6,1.2894e6,6613.85,21421.6,0.0
2018-01-02,2.5680e6,1.1413e6,4493.48,18189.38,0.0
2018-01-03,2.8910e6,1.1994e6,5551.6,19222.34,0.0
2018-01-04,1.1776e6,570874.96,2359.89,8071.63,0.0
2018-01-05,2.9262e6,1.0938e6,4433.03,17224.93,0.0
2018-01-06,2.9765e6,1.1744e6,5268.63,19602.58,0.0
2018-01-07,2.6601e6,951547.710001,5778.22,16715.7,0.0
...,...,...,...,...,...
2018-12-25,1.2053e6,748053.26,2722.0,7700.65,0.0
2018-12-26,1.9707e6,1.0382e6,4818.51,11042.54,0.0


In [39]:
(catalog_payment_type
 .join(taxi_clean, on='payment_type_id', how='left')
 .select('total_amount',
    pl.col('pickup_datetime').cast(pl.Date),
    'payment_type',
).sort('pickup_datetime')
.pivot(values='total_amount', index='pickup_datetime', aggregate_function='sum', columns='payment_type')
.fill_null(0)
)

pickup_datetime,Voided trip,Credit card,Cash,No charge,Dispute,Unknown
date,f64,f64,f64,f64,f64,f64
,0.0,0.0,0.0,0.0,0.0,0.0
2018-01-01,0.0,2.5463e6,1.2894e6,21421.6,6613.85,0.0
2018-01-02,0.0,2.5680e6,1.1413e6,18189.38,4493.48,0.0
2018-01-03,0.0,2.8910e6,1.1994e6,19222.34,5551.6,0.0
2018-01-04,0.0,1.1776e6,570874.96,8071.63,2359.89,0.0
2018-01-05,0.0,2.9262e6,1.0938e6,17224.93,4433.03,0.0
2018-01-06,0.0,2.9765e6,1.1744e6,19602.58,5268.63,0.0
...,...,...,...,...,...,...
2018-12-25,0.0,1.2053e6,748053.26,7700.65,2722.0,0.0
2018-12-26,0.0,1.9707e6,1.0382e6,11042.54,4818.51,0.0


In [46]:
taxi_pivot.melt(
        id_vars='pickup_datetime', 
        value_vars=['Credit card', 'Cash', 'Dispute', 'No charge', 'Unknown' ],
        variable_name='payment_type')

pickup_datetime,payment_type,value
date,str,f64
2018-01-01,"""Credit card""",2.5463e6
2018-01-02,"""Credit card""",2.5680e6
2018-01-03,"""Credit card""",2.8910e6
2018-01-04,"""Credit card""",1.1776e6
2018-01-05,"""Credit card""",2.9262e6
2018-01-06,"""Credit card""",2.9765e6
2018-01-07,"""Credit card""",2.6601e6
...,...,...
2018-12-25,"""Unknown""",0.0
2018-12-26,"""Unknown""",0.0


In [51]:
taxi_pivot.melt(
        id_vars='pickup_datetime', 
        value_vars=['Credit card', 'Cash', 'Dispute', 'No charge', 'Unknown' ],
        variable_name='payment_type') \
   .transpose()

column_0,column_1,column_2,column_3,column_4,column_5,column_6,column_7,column_8,column_9,column_10,column_11,column_12,column_13,column_14,column_15,column_16,column_17,column_18,column_19,column_20,column_21,column_22,column_23,column_24,...,column_1800,column_1801,column_1802,column_1803,column_1804,column_1805,column_1806,column_1807,column_1808,column_1809,column_1810,column_1811,column_1812,column_1813,column_1814,column_1815,column_1816,column_1817,column_1818,column_1819,column_1820,column_1821,column_1822,column_1823,column_1824
str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,...,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str
"""2018-01-01""","""2018-01-02""","""2018-01-03""","""2018-01-04""","""2018-01-05""","""2018-01-06""","""2018-01-07""","""2018-01-08""","""2018-01-09""","""2018-01-10""","""2018-01-11""","""2018-01-12""","""2018-01-13""","""2018-01-14""","""2018-01-15""","""2018-01-16""","""2018-01-17""","""2018-01-18""","""2018-01-19""","""2018-01-20""","""2018-01-21""","""2018-01-22""","""2018-01-23""","""2018-01-24""","""2018-01-25""",...,"""2018-12-07""","""2018-12-08""","""2018-12-09""","""2018-12-10""","""2018-12-11""","""2018-12-12""","""2018-12-13""","""2018-12-14""","""2018-12-15""","""2018-12-16""","""2018-12-17""","""2018-12-18""","""2018-12-19""","""2018-12-20""","""2018-12-21""","""2018-12-22""","""2018-12-23""","""2018-12-24""","""2018-12-25""","""2018-12-26""","""2018-12-27""","""2018-12-28""","""2018-12-29""","""2018-12-30""","""2018-12-31"""
"""Credit card""","""Credit card""","""Credit card""","""Credit card""","""Credit card""","""Credit card""","""Credit card""","""Credit card""","""Credit card""","""Credit card""","""Credit card""","""Credit card""","""Credit card""","""Credit card""","""Credit card""","""Credit card""","""Credit card""","""Credit card""","""Credit card""","""Credit card""","""Credit card""","""Credit card""","""Credit card""","""Credit card""","""Credit card""",...,"""Unknown""","""Unknown""","""Unknown""","""Unknown""","""Unknown""","""Unknown""","""Unknown""","""Unknown""","""Unknown""","""Unknown""","""Unknown""","""Unknown""","""Unknown""","""Unknown""","""Unknown""","""Unknown""","""Unknown""","""Unknown""","""Unknown""","""Unknown""","""Unknown""","""Unknown""","""Unknown""","""Unknown""","""Unknown"""
"""2546275.0100002615""","""2568009.8600001032""","""2890976.49999896""","""1177616.9100004712""","""2926184.279998829""","""2976543.5799986436""","""2660135.0000000657""","""3216147.5399981397""","""3485634.9099973547""","""3559178.659997209""","""3842651.2299965774""","""3886962.8899964266""","""3431949.989996999""","""3120771.0099982247""","""2914503.7799992533""","""3708112.1099968944""","""3641897.9099968667""","""4033924.1999959415""","""3890434.4199961857""","""3199893.669997696""","""2936256.9299991173""","""3144609.1199982995""","""3457759.5099974643""","""3833515.9999963283""","""4134450.9399956893""",...,"""0.0""","""0.0""","""0.0""","""0.0""","""0.0""","""0.0""","""0.0""","""0.0""","""0.0""","""0.0""","""0.0""","""0.0""","""0.0""","""0.0""","""0.0""","""0.0""","""0.0""","""0.0""","""0.0""","""0.0""","""0.0""","""0.0""","""0.0""","""0.0""","""0.0"""


## SQL

In [59]:
sql = pl.SQLContext()
sql.register('taxi_table', taxi_clean.lazy())

In [66]:
sql.query(
    """
        SELECT COUNT (DISTINCT(vendor_id)) AS vendor_id_count_distinct
        FROM taxi_table
    """
)

vendor_id_count_distinct
u32
3


In [65]:
sql.query(
    """
        SELECT COUNT (DISTINCT total_amount)
        FROM taxi_table
    """
)

total_amount
u32
17953


In [68]:
sql.register('cat_payment', catalog_payment_type.lazy())

In [71]:
sql.query(
    """
        SELECT payment_type_id::string, total_amount
        FROM taxi_table
        LIMIT 10
    """
)

payment_type_id,total_amount
str,f64
"""1""",11.3
"""1""",8.3
"""2""",5.8
"""1""",14.76
"""1""",18.8
"""1""",6.5
"""1""",50.47
"""1""",12.96
"""1""",9.02
"""1""",11.76


In [73]:
sql.query(
      """
        SELECT payment_type_id,
               sum(total_amount)
        FROM taxi_table
        GROUP BY payment_type_id
    """
)

payment_type_id,total_amount
cat,f64
"""1""",1375600000.0
"""4""",2267000.0
"""3""",7864100.0
"""2""",446170000.0
"""5""",97.54


Por el momento los `joins` aún no están implementados

In [72]:
sql.query(
    """
        SELECT payment_type, total_amount
        FROM taxi_table tt
        LEFT JOIN cat_payment cp
        ON tt.payment_type_id= cp.payment_type_id
        LIMIT 10
    """
)

ComputeError: SQL join constraint On(BinaryOp { left: CompoundIdentifier([Ident { value: "tt", quote_style: None }, Ident { value: "payment_type_id", quote_style: None }]), op: Eq, right: CompoundIdentifier([Ident { value: "cp", quote_style: None }, Ident { value: "payment_type_id", quote_style: None }]) }) is not yet supported