In [12]:
# Read the data
# Remember to download the data before
import polars as pl
import os
import glob

data_source = os.path.join("..", "data", "taxi")

# Get a list of all parquet files in the directory
parquet_files = glob.glob(os.path.join(data_source, "*.parquet"))

# Read each file into a dataframe and concatenate them
data = pl.concat([pl.scan_parquet(file) for file in parquet_files])

print(data.schema)

# interesting link to understand this concepts
# https://til.simonwillison.net/duckdb/remote-parquet

OrderedDict([('VendorID', Int32), ('tpep_pickup_datetime', Datetime(time_unit='ns', time_zone=None)), ('tpep_dropoff_datetime', Datetime(time_unit='ns', time_zone=None)), ('passenger_count', Int64), ('trip_distance', Float64), ('RatecodeID', Int64), ('store_and_fwd_flag', Utf8), ('PULocationID', Int32), ('DOLocationID', Int32), ('payment_type', Int64), ('fare_amount', Float64), ('extra', Float64), ('mta_tax', Float64), ('tip_amount', Float64), ('tolls_amount', Float64), ('improvement_surcharge', Float64), ('total_amount', Float64), ('congestion_surcharge', Float64), ('Airport_fee', Float64)])


In [13]:
# Select columns
data.select(["VendorID", "total_amount"]).head()

In [14]:
# Selectors to use expressions
import polars.selectors as cs

data.select(
    cs.integer() - cs.first() | cs.temporal()
)

data.select(
    (cs.integer() - cs.contains("LocationID")).as_expr().mean()
).fetch()
# fetch it's interesting because can help on execute and see the result without computing everything.
# collect will execute and optimize everything

VendorID,passenger_count,RatecodeID,payment_type
f64,f64,f64,f64
1.72375,1.3815,1.31075,1.2355


In [23]:
# A more advanced example with a custom expression and renames
data.select(
    (pl.col("trip_distance") * 1000).alias("trip_distance_meters"),
    (cs.contains("amount").as_expr() * 1000).name.suffix("_cents"),
    # cs .contains("amount").as_expr().mul(1000).floordiv(10)
).fetch()

trip_distance_meters,fare_amount_cents,tip_amount_cents,tolls_amount_cents,total_amount_cents
f64,f64,f64,f64,f64
4900.0,28900.0,6000.0,0.0,39900.0
21890.0,70000.0,0.0,6550.0,81800.0
1300.0,11400.0,2000.0,0.0,18400.0
1500.0,10000.0,1000.0,0.0,16000.0
1490.0,11400.0,1000.0,0.0,17400.0
1200.0,9300.0,3550.0,0.0,17850.0
8610.0,33800.0,10320.0,6550.0,61920.0
3880.0,21900.0,6720.0,0.0,33620.0
8000.0,31000.0,0.0,6550.0,48800.0
1480.0,9300.0,2860.0,0.0,17160.0


In [33]:
# Compress the memory casting and save again to parquet
import polars.selectors as cs

(
    data
    .with_columns(
        cs.numeric().as_expr().shrink_dtype(),
        pl.col(pl.Int8).cast(pl.UInt8),
        cs.string().as_expr().cast(pl.Categorical),
        cs.temporal().as_expr().cast(pl.Datetime(time_unit="ms"))
    )
    .sink_parquet("../data/test.parquet")
)

In [35]:
data.select(pl.col("payment_type").n_unique()).collect()

payment_type
u32
6


In [40]:
# you have a lot of functions available, for example to see the unique
%time data.select(pl.all().n_unique()).collect()


CPU times: user 12.7 s, sys: 5.42 s, total: 18.1 s
Wall time: 931 ms


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
3,12806518,12801318,11,7722,8,3,262,262,6,13843,186,16,7017,2532,5,31446,7,8


In [41]:
# you have approximate functions
%time data.select(pl.all().approx_n_unique()).collect()

CPU times: user 8.43 s, sys: 3.15 s, total: 11.6 s
Wall time: 718 ms


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
3,12822105,12754204,11,7740,8,3,261,261,6,13681,186,16,6983,2548,5,31369,7,8


In [43]:
# namespaces
shrunk = (
    data
    .with_columns(
        cs.numeric().as_expr().shrink_dtype(),
        pl.col(pl.Int8).cast(pl.UInt8),
        cs.string().as_expr().cast(pl.Categorical),
        cs.temporal().as_expr().cast(pl.Datetime(time_unit="ms"))
    )
)

# you can isolate pieces of plans, concatenate expression and move only transformations.
# this is a logic plan and you can move it around
# this is completely lazy and transparent
shrunk

In [51]:
# I'm adding a new operation inside the shrunk plan created before
(
    shrunk
    .with_columns(
        cs.temporal().as_expr().dt.day().name.suffix("_days")
    ).explain(optimized=True)
)
# ignore all the previous operations and just execute the last one
# it can do subquery pruning and optimization keeping only what is necessary

' WITH_COLUMNS:\n [col("tpep_pickup_datetime").dt.day().alias("tpep_pickup_datetime_days"), col("tpep_dropoff_datetime").dt.day().alias("tpep_dropoff_datetime_days")]\n   WITH_COLUMNS:\n   [col("VendorID").shrink_dtype(), col("passenger_count").shrink_dtype(), col("trip_distance").shrink_dtype(), col("RatecodeID").shrink_dtype(), col("PULocationID").shrink_dtype(), col("DOLocationID").shrink_dtype(), col("payment_type").shrink_dtype(), col("fare_amount").shrink_dtype(), col("extra").shrink_dtype(), col("mta_tax").shrink_dtype(), col("tip_amount").shrink_dtype(), col("tolls_amount").shrink_dtype(), col("improvement_surcharge").shrink_dtype(), col("total_amount").shrink_dtype(), col("congestion_surcharge").shrink_dtype(), col("Airport_fee").shrink_dtype(), col("store_and_fwd_flag").strict_cast(Categorical(None)), col("tpep_pickup_datetime").strict_cast(Datetime(Milliseconds, None)), col("tpep_dropoff_datetime").strict_cast(Datetime(Milliseconds, None))]\n    UNION\n      PLAN 0:\n\n     

In [57]:
# another example more complex
# I'm adding a new operation inside the shrunk plan created before
(
    shrunk
    .filter(
        pl.col("tpep_pickup_datetime").dt.day().eq(1)
    )
    .select(
        pl.col("tpep_pickup_datetime").cast(pl.Utf8).str.split(" ").map_elements(lambda s: s.str.to_uppercase()),
    )
    .fetch()
)

VendorID,total_amount
i8,list[f32]
1,"[39.900002, 18.4, … 12.9]"
2,"[81.800003, 17.4, … 15.48]"


In [None]:
# another example more complex
(
    shrunk
    .filter(
        pl.col("tpep_pickup_datetime").dt.day().eq(1)
    )
    .group_by("VendorID").agg("total_amount")
    .fetch()
)

In [59]:
# another example more complex
(
    shrunk
    .filter(
        pl.col("tpep_pickup_datetime").dt.day().eq(1)
    )
    .group_by("VendorID", "payment_type")
    .agg("total_amount")
    .with_columns(
        pl.col("total_amount").list.eval(pl.element().rolling_mean(window_size=3))
    )
    .fetch()
)

VendorID,payment_type,total_amount
i8,i8,list[f32]
1,3,"[null, null, … 8.783334]"
2,2,"[null, null, … 41.16666]"
1,4,"[null, null, … 14.399999]"
1,2,"[null, null, … 15.799995]"
2,1,"[null, null, … 17.160069]"
2,3,"[null, null, … 3.6]"
1,1,"[null, null, … 21.913385]"
2,4,"[null, null, … 5.233333]"
