#### Top

# Chapter 9 Being Lazy and Streaming

* [9.0 Imports and Setup](#9.0-Imports-and-Setup)
* [9.1 Loading Data](#9.1-Loading-Data)
* [9.2 The replace Method](#9.2-The-replace-Method)
* [9.3 Lazy Version v1](#9.3-Lazy-Version-v1)
* [9.4 Fetching Data](#9.4-Fetching-Data)
* [9.5 Pandas Comparison](#9.5-Pandas-Comparison)
* [9.6 Viewing Plans](#9.6-Viewing-Plans)
* [9.7 Streaming](#9.7-Streaming)



---
# 9.0 Imports and Setup

[back to Top](#Top)

In [1]:
import polars as pl
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib_inline.backend_inline
import chardet
import pprint as pp
import hvplot.polars
hvplot.extension('matplotlib')

matplotlib_inline.backend_inline.set_matplotlib_formats("retina")
pd.options.mode.copy_on_write = True
print(pd.options.mode.copy_on_write)
pl.Config.set_verbose(True)
pl.show_versions()

def HR():
    print("-"*40)

@pl.Config(tbl_cols=-1, ascii_tables=True)
def tight_layout(df: pl.DataFrame, n=5) -> None:
    with pl.Config(tbl_cols=-1, fmt_str_lengths=n):
        print(df)

def detect_encoding(filename: str) -> str:
    """Return the most probable character encoding for a file."""
    with open(filename, "rb") as f:
        raw_data = f.read()
        print(raw_data)
        result = chardet.detect(raw_data)
        return result["encoding"]

True
--------Version info---------
Polars:              1.9.0
Index type:          UInt32
Platform:            macOS-12.7.6-x86_64-i386-64bit
Python:              3.11.5 (main, Jan 16 2024, 17:25:53) [Clang 14.0.0 (clang-1400.0.29.202)]

----Optional dependencies----
adbc_driver_manager  1.1.0
altair               5.4.0
cloudpickle          3.0.0
connectorx           0.3.3
deltalake            0.19.1
fastexcel            0.11.6
fsspec               2023.12.2
gevent               24.2.1
great_tables         0.10.0
matplotlib           3.9.2
nest_asyncio         1.6.0
numpy                2.0.2
openpyxl             3.1.5
pandas               2.2.2
pyarrow              17.0.0
pydantic             2.8.2
pyiceberg            0.6.1
sqlalchemy           2.0.32
torch                <not installed>
xlsx2csv             0.8.3
xlsxwriter           3.2.0


---
# 9.1 Loading Data
[back to Top](#Top)

* Use the Fuel Economy dataset from the EPA.
* Summarize the yearly city mileage by country of origin.
* Create a new column called *origin* that is the country of origin.
* Filter out unknown origins and years after 2019.
* Select a few of the 83 columns to keep.
* Group the results by origin and year and compute the mean city mileage.
* Pivot the data so each country is a column and years are rows.
* Sort the data by year.

In [2]:
# Pandas user-defined function
def make_to_origin_expr(make_col):
    origin_dict = {
        'Chevrolet': 'USA',
        'Ford': 'USA',
        'Dodge': 'USA',
        'GMC': 'USA',
        'Toyota': 'Japan',
        'BMW': 'Germany',
        'Mercedes-Benz': 'Germany',
        'Nissan': 'Japan',
        'Volkswagen': 'Germany',
        'Mitsubishi': 'Japan',
        'Porsche': 'Germany',
        'Mazda': 'Japan',
        'Audi': 'Germany',
        'Honda': 'Japan',
        'Jeep': 'USA',
        'Pontiac': 'USA',
        'Subaru': 'Japan',
        'Volvo': 'Sweden',
        'Hyundai': 'South Korea',
        'Chrysler': 'USA',
        'Tesla': 'USA'
    }
    expr = None
    col = pl.col(make_col)
    for k, v in origin_dict.items():
        if expr is None:
            expr = pl.when(col == k).then(pl.lit(v))
        else:
            expr = expr.when(col == k).then(pl.lit(v))
    expr = expr.otherwise(pl.lit('Unknown'))
    return expr

In [3]:
df_pl = pl.read_csv(
    './data/vehicles.csv',
    null_values='NA'
)
df_pl.head(3)

avg line length: 434.78027
std. dev. line length: 23.885818
initial row estimate: 47850
no. of chunks: 4 processed by: 4 threads.


barrels08,barrelsA08,charge120,charge240,city08,city08U,cityA08,cityA08U,cityCD,cityE,cityUF,co2,co2A,co2TailpipeAGpm,co2TailpipeGpm,comb08,comb08U,combA08,combA08U,combE,combinedCD,combinedUF,cylinders,displ,drive,engId,eng_dscr,feScore,fuelCost08,fuelCostA08,fuelType,fuelType1,ghgScore,ghgScoreA,highway08,highway08U,highwayA08,highwayA08U,highwayCD,highwayE,highwayUF,hlv,hpv,id,lv2,lv4,make,model,mpgData,phevBlended,pv2,pv4,range,rangeCity,rangeCityA,rangeHwy,rangeHwyA,trany,UCity,UCityA,UHighway,UHighwayA,VClass,year,youSaveSpend,baseModel,guzzler,trans_dscr,tCharger,sCharger,atvType,fuelType2,rangeA,evMotor,mfrCode,c240Dscr,charge240b,c240bDscr,createdOn,modifiedOn,startStop,phevCity,phevHwy,phevComb
f64,f64,f64,f64,i64,f64,i64,f64,f64,f64,f64,i64,i64,f64,f64,i64,f64,i64,f64,f64,f64,f64,i64,f64,str,i64,str,i64,i64,i64,str,str,i64,i64,i64,f64,i64,f64,f64,f64,f64,i64,i64,i64,i64,i64,str,str,str,bool,i64,i64,i64,f64,f64,f64,f64,str,f64,f64,f64,f64,str,i64,i64,str,str,str,str,str,str,str,str,str,str,str,f64,str,str,str,str,i64,i64,i64
14.167143,0.0,0.0,0.0,19,0.0,0,0.0,0.0,0.0,0.0,-1,-1,0.0,423.190476,21,0.0,0,0.0,0.0,0.0,0.0,4,2.0,"""Rear-Wheel Drive""",9011,"""(FFS)""",-1,2250,0,"""Regular""","""Regular Gasoline""",-1,-1,25,0.0,0,0.0,0.0,0.0,0.0,0,0,1,0,0,"""Alfa Romeo""","""Spider Veloce 2000""","""Y""",False,0,0,0,0.0,0.0,0.0,0.0,"""Manual 5-spd""",23.3333,0.0,35.0,0.0,"""Two Seaters""",1985,-2750,"""Spider""",,,,,,,,,,,0.0,,"""Tue Jan 01 00:00:00 EST 2013""","""Tue Jan 01 00:00:00 EST 2013""",,0,0,0
27.046364,0.0,0.0,0.0,9,0.0,0,0.0,0.0,0.0,0.0,-1,-1,0.0,807.909091,11,0.0,0,0.0,0.0,0.0,0.0,12,4.9,"""Rear-Wheel Drive""",22020,"""(GUZZLER)""",-1,4300,0,"""Regular""","""Regular Gasoline""",-1,-1,14,0.0,0,0.0,0.0,0.0,0.0,0,0,10,0,0,"""Ferrari""","""Testarossa""","""N""",False,0,0,0,0.0,0.0,0.0,0.0,"""Manual 5-spd""",11.0,0.0,19.0,0.0,"""Two Seaters""",1985,-13000,"""Testarossa""","""T""",,,,,,,,,,0.0,,"""Tue Jan 01 00:00:00 EST 2013""","""Tue Jan 01 00:00:00 EST 2013""",,0,0,0
11.018889,0.0,0.0,0.0,23,0.0,0,0.0,0.0,0.0,0.0,-1,-1,0.0,329.148148,27,0.0,0,0.0,0.0,0.0,0.0,4,2.2,"""Front-Wheel Drive""",2100,"""(FFS)""",-1,1750,0,"""Regular""","""Regular Gasoline""",-1,-1,33,0.0,0,0.0,0.0,0.0,0.0,19,77,100,0,0,"""Dodge""","""Charger""","""Y""",False,0,0,0,0.0,0.0,0.0,0.0,"""Manual 5-spd""",29.0,0.0,47.0,0.0,"""Subcompact Cars""",1985,-250,"""Charger""",,"""SIL""",,,,,,,,,0.0,,"""Tue Jan 01 00:00:00 EST 2013""","""Tue Jan 01 00:00:00 EST 2013""",,0,0,0


In [4]:
result = (
    df_pl
    .with_columns(
        pl.col('createdOn')
            .str.to_datetime('%a %b %d %H:%M:%S %Z %Y'),
        origin=make_to_origin_expr('make')
    )
    .filter(
        (pl.col('origin')!="Unknown") & (pl.col('year')<2020)
    )
    .select([
        'make', 'model', 'year', 'city08', 'highway08', 'origin', 'createdOn'
    ])
    .group_by(['origin', 'year'])
    .agg(avg_city08=pl.col('city08').mean())
    .pivot(
        index='year', 
        on='origin', 
        values='avg_city08'
    )
    .sort('year')
)
result

dataframe filtered
dataframe filtered
estimated unique values: 356
run PARTITIONED HASH AGGREGATION


year,Sweden,USA,South Korea,Japan,Germany
i64,f64,f64,f64,f64,f64
1984,18.540541,16.458456,,21.705882,20.06338
1985,17.529412,16.576883,,21.533333,18.175676
1986,19.0,16.424023,24.0,20.606383,18.072464
1987,17.5625,16.042879,24.0,20.085202,17.724138
1988,17.714286,16.085179,23.333333,19.919283,16.5
…,…,…,…,…,…
2015,20.666667,21.473016,22.75,23.891192,20.730556
2016,21.111111,24.283951,23.711111,26.324022,20.75
2017,22.272727,23.414634,29.295455,26.353488,21.09375
2018,22.740741,22.990769,29.674419,26.106481,20.767045


* Time this code
* Create wrapper function
* Use decorator to temporarily turn off verbose-mode

In [5]:
%%timeit
@pl.Config(set_verbose=False)
def timing_test():
    df_pl = pl.read_csv('./data/vehicles.csv', null_values='NA')
    
    result = (
        df_pl
        .with_columns(
            pl.col('createdOn')
                .str.to_datetime('%a %b %d %H:%M:%S %Z %Y'),
            origin=make_to_origin_expr('make')
        )
        .filter(
            (pl.col('origin')!="Unknown") & (pl.col('year')<2020)
        )
        .select([
            'make', 'model', 'year', 'city08', 'highway08', 'origin', 'createdOn'
        ])
        .group_by(['origin', 'year'])
        .agg(avg_city08=pl.col('city08').mean())
        .pivot(
            index='year', 
            on='origin', 
            values='avg_city08'
        )
        .sort('year')
    )
    result

timing_test()

125 ms ± 13 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


---
# 9.2 The replace Method
[back to Top](#Top)

* `polars.Expr.replace()` is a dedicated Polars method for replacing values in a column.
* https://docs.pola.rs/py-polars/html/reference/expressions/api/polars.Expr.replace.html#polars.Expr.replace

In [6]:
def make_to_origin_replace(make_col):
    origin_dict = {
        'Chevrolet': 'USA',
        'Ford': 'USA',
        'Dodge': 'USA',
        'GMC': 'USA',
        'Toyota': 'Japan',
        'BMW': 'Germany',
        'Mercedes-Benz': 'Germany',
        'Nissan': 'Japan',
        'Volkswagen': 'Germany',
        'Mitsubishi': 'Japan',
        'Porsche': 'Germany',
        'Mazda': 'Japan',
        'Audi': 'Germany',
        'Honda': 'Japan',
        'Jeep': 'USA',
        'Pontiac': 'USA',
        'Subaru': 'Japan',
        'Volvo': 'Sweden',
        'Hyundai': 'South Korea',
        'Chrysler': 'USA',
        'Tesla': 'USA'
    }
    return make_col.replace_strict(
        origin_dict,
        default='Unknown'
    )

In [7]:
df_pl = pl.read_csv('./data/vehicles.csv', null_values='NA')

result_pl_eager = (
    df_pl
    .with_columns(
        pl.col('createdOn')
            .str.to_datetime('%a %b %d %H:%M:%S %Z %Y'),
        origin=make_to_origin_replace(pl.col('make'))
    )
    .filter(
        (pl.col('origin')!="Unknown") & (pl.col('year')<2020)
    )
    .select([
        'make', 'model', 'year', 'city08', 'highway08', 'origin', 'createdOn'
    ])
    .group_by(['origin', 'year'])
    .agg(avg_city08=pl.col('city08').mean())
    .sort('origin')
    .pivot(
        index='year', 
        on='origin', 
        values='avg_city08'
    )
    .sort('year')
)
result_pl_eager.head(3)

avg line length: 434.78027
std. dev. line length: 23.885818
initial row estimate: 47850
no. of chunks: 4 processed by: 4 threads.
dataframe filtered
dataframe filtered
estimated unique values: 356
run PARTITIONED HASH AGGREGATION


year,Germany,Japan,South Korea,Sweden,USA
i64,f64,f64,f64,f64,f64
1984,20.06338,21.705882,,18.540541,16.458456
1985,18.175676,21.533333,,17.529412,16.576883
1986,18.072464,20.606383,24.0,19.0,16.424023


In [8]:
%%timeit
@pl.Config(set_verbose=False)
def timing_test():
    df_pl = pl.read_csv('./data/vehicles.csv', null_values='NA')
    
    result_pl_eager = (
        df_pl
        .with_columns(
            pl.col('createdOn')
                .str.to_datetime('%a %b %d %H:%M:%S %Z %Y'),
            origin=make_to_origin_replace(pl.col('make'))
        )
        .filter(
            (pl.col('origin')!="Unknown") & (pl.col('year')<2020)
        )
        .select([
            'make', 'model', 'year', 'city08', 'highway08', 'origin', 'createdOn'
        ])
        .group_by(['origin', 'year'])
        .agg(avg_city08=pl.col('city08').mean())
        .sort('origin')
        .pivot(
            index='year', 
            on='origin', 
            values='avg_city08'
        )
        .sort('year')
    )
    
    result_pl_eager

timing_test()

121 ms ± 13 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


---
# 9.3 Lazy Version v1
[back to Top](#Top)

* Code is not executed with lazy evaluation until we all `collect()`.
* When we use `.collect()`, Polars searches for the most efficient way to execute the code.
* Here, switch `pl.read_csv()` with `pl.scan_csv()`.
* This returns a lazy dataframe.

In [9]:
df_pl_lazy = pl.scan_csv('./data/vehicles.csv', null_values='NA')

try: 
    result = (
        df_pl_lazy
        .with_columns(
            pl.col('createdOn')
                .str.to_datetime('%a %b %d %H:%M:%S %Z %Y'),
            origin=make_to_origin_replace(pl.col('make'))
        )
        .filter(
            (pl.col('origin')!="Unknown") & (pl.col('year')<2020)
        )
        .select([
            'make', 'model', 'year', 'city08', 'highway08', 'origin', 'createdOn'
        ])
        .group_by(['origin', 'year'])
        .agg(avg_city08=pl.col('city08').mean())
        .pivot(
            index='year', 
            columns='origin', 
            values='avg_city08'
        )
        .sort('year')
    )
except Exception as e:
    print(e)

'LazyFrame' object has no attribute 'pivot'


* We can see that `.pivot()` is not supported on a lazy dataframe.
* We can run this code lazily up to the pivot operation.

In [10]:
df_pl_lazy = pl.scan_csv('./data/vehicles.csv', null_values='NA')

result_pl_lazy = (
    df_pl_lazy
    .with_columns(
        pl.col('createdOn')
            .str.to_datetime('%a %b %d %H:%M:%S %Z %Y'),
        origin=make_to_origin_replace(pl.col('make'))
    )
    .filter(
        (pl.col('origin')!="Unknown") & (pl.col('year')<2020)
    )
    .select([
        'make', 'model', 'year', 'city08', 'highway08', 'origin', 'createdOn'
    ])
    .group_by(['origin', 'year'])
    .agg(avg_city08=pl.col('city08').mean())
    .collect()
    .sort('origin')
    .pivot(
        index='year', 
        on='origin', 
        values='avg_city08'
    )
    .sort('year')
)
result_pl_lazy

read files in parallel
avg line length: 434.78027
std. dev. line length: 23.885818
initial row estimate: 47850
no. of chunks: 4 processed by: 4 threads.
dataframe filtered
estimated unique values: 356
run PARTITIONED HASH AGGREGATION


year,Germany,Japan,South Korea,Sweden,USA
i64,f64,f64,f64,f64,f64
1984,20.06338,21.705882,,18.540541,16.458456
1985,18.175676,21.533333,,17.529412,16.576883
1986,18.072464,20.606383,24.0,19.0,16.424023
1987,17.724138,20.085202,24.0,17.5625,16.042879
1988,16.5,19.919283,23.333333,17.714286,16.085179
…,…,…,…,…,…
2015,20.730556,23.891192,22.75,20.666667,21.473016
2016,20.75,26.324022,23.711111,21.111111,24.283951
2017,21.09375,26.353488,29.295455,22.272727,23.414634
2018,20.767045,26.106481,29.674419,22.740741,22.990769


In [11]:
%%timeit
@pl.Config(set_verbose=False)
def timing_test():
    df_pl_lazy = pl.scan_csv('./data/vehicles.csv', null_values='NA')
    
    result_pl_lazy = (
        df_pl_lazy
        .with_columns(
            pl.col('createdOn')
                .str.to_datetime('%a %b %d %H:%M:%S %Z %Y'),
            origin=make_to_origin_replace(pl.col('make'))
        )
        .filter(
            (pl.col('origin')!="Unknown") & (pl.col('year')<2020)
        )
        .select([
            'make', 'model', 'year', 'city08', 'highway08', 'origin', 'createdOn'
        ])
        .group_by(['origin', 'year'])
        .agg(avg_city08=pl.col('city08').mean())
        .collect()
        .sort('origin')
        .pivot(
            index='year', 
            on='origin', 
            values='avg_city08'
        )
        .sort('year')
    )
    result_pl_lazy

timing_test()

28 ms ± 1.74 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


---
# 9.4 Fetching Data
[back to Top](#Top)

* `.fetch()` gets a chunk of data from a lazy dataframe.
* Unlike `.collect()` which gets all the data, `.fetch()` gets a specified number of rows.
* Here, use `fetch()` to fetch 5 rows of data.

In [12]:
df_pl_lazy = pl.scan_csv('./data/vehicles.csv', null_values='NA')

result_fetch_lazy = (
    df_pl_lazy
    .with_columns(
        pl.col('createdOn')
            .str.to_datetime('%a %b %d %H:%M:%S %Z %Y'),
        origin=make_to_origin_replace(pl.col('make'))
    )
    .filter(
        (pl.col('origin')!="Unknown") & (pl.col('year')<2020)
    )
    .select([
        'make', 'model', 'year', 'city08', 'highway08', 'origin', 'createdOn'
    ])
    .group_by(['origin', 'year'])
    .agg(avg_city08=pl.col('city08').mean())

    #.fetch(5) # deprecated
    .collect().head(5) # suggested, LazyFrame.collect().head(n)

    .pivot(
        index='year', 
        on='origin', 
        values='avg_city08'
    )
    .sort('year')
)
result_fetch_lazy

read files in parallel
avg line length: 434.78027
std. dev. line length: 23.885818
initial row estimate: 47850
no. of chunks: 4 processed by: 4 threads.
dataframe filtered
estimated unique values: 356
run PARTITIONED HASH AGGREGATION


year,Sweden,South Korea,Germany,USA
i64,f64,f64,f64,f64
1993,,,15.873563,15.345212
2002,,19.210526,,
2012,18.4,,,
2014,18.076923,,,


---
# 9.5 Pandas Comparison

[back to Top](#Top)

In [13]:
import pandas as pd

def make_to_origin(make):
    origin_dict = {
        'Chevrolet': 'USA',
        'Ford': 'USA',
        'Dodge': 'USA',
        'GMC': 'USA',
        'Toyota': 'Japan',
        'BMW': 'Germany',
        'Mercedes-Benz': 'Germany',
        'Nissan': 'Japan',
        'Volkswagen': 'Germany',
        'Mitsubishi': 'Japan',
        'Porsche': 'Germany',
        'Mazda': 'Japan',
        'Audi': 'Germany',
        'Honda': 'Japan',
        'Jeep': 'USA',
        'Pontiac': 'USA',
        'Subaru': 'Japan',
        'Volvo': 'Sweden',
        'Hyundai': 'South Korea',
        'Chrysler': 'USA',
        'Tesla': 'USA'
    }
    return origin_dict.get(make, 'Unknown')

df_pd = pd.read_csv(
    './data/vehicles.csv',
    engine='pyarrow',
    dtype_backend='pyarrow'
)

result_pandas = (
    df_pd
    .assign(origin=lambda df: df['make'].apply(make_to_origin),
           # replace EST and EDT with offset in createdOn
            createdOn=lambda df: df['createdOn']
                .str.replace('EDT', '-04:00').str.replace('EST', '-05:00')
           )
    .assign(
        createdOn=lambda df: pd.to_datetime(df['createdOn'],
                format='%a %b %d %H:%M:%S %z %Y', utc=True),
    )
    .query('origin != "Unknown" and year < 2020')
    .loc[:, ['make', 'model', 'year', 'city08', 'highway08', 'origin', 'createdOn']]
    .groupby(['origin', 'year'])
    .city08
    .mean()
    .unstack('origin')
)
result_pandas

  .query('origin != "Unknown" and year < 2020')


origin,Germany,Japan,South Korea,Sweden,USA
year,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
1984,20.06338,21.705882,,18.540541,16.458456
1985,18.175676,21.533333,,17.529412,16.576883
1986,18.072464,20.606383,24.0,19.0,16.424023
1987,17.724138,20.085202,24.0,17.5625,16.042879
1988,16.5,19.919283,23.333333,17.714286,16.085179
1989,16.307692,19.519048,20.333333,17.2,15.73913
1990,16.493333,19.47032,21.0,17.272727,15.344168
1991,16.036364,19.204444,22.0,17.631579,15.21328
1992,15.625,19.040909,21.5,17.538462,15.253165
1993,15.873563,19.043668,21.083333,17.25,15.345212


In [14]:
%%timeit
@pl.Config(set_verbose=False)
def timing_test():
    
    def make_to_origin(make):
        origin_dict = {
            'Chevrolet': 'USA',
            'Ford': 'USA',
            'Dodge': 'USA',
            'GMC': 'USA',
            'Toyota': 'Japan',
            'BMW': 'Germany',
            'Mercedes-Benz': 'Germany',
            'Nissan': 'Japan',
            'Volkswagen': 'Germany',
            'Mitsubishi': 'Japan',
            'Porsche': 'Germany',
            'Mazda': 'Japan',
            'Audi': 'Germany',
            'Honda': 'Japan',
            'Jeep': 'USA',
            'Pontiac': 'USA',
            'Subaru': 'Japan',
            'Volvo': 'Sweden',
            'Hyundai': 'South Korea',
            'Chrysler': 'USA',
            'Tesla': 'USA'
        }
        return origin_dict.get(make, 'Unknown')
    
    df_pd = pd.read_csv(
        './data/vehicles.csv',
        engine='pyarrow',
        dtype_backend='pyarrow'
    )
    
    result_pandas = (
        df_pd
        .assign(origin=lambda df: df['make'].apply(make_to_origin),
               # replace EST and EDT with offset in createdOn
                createdOn=lambda df: df['createdOn']
                    .str.replace('EDT', '-04:00').str.replace('EST', '-05:00')
               )
        .assign(
            createdOn=lambda df: pd.to_datetime(df['createdOn'],
                    format='%a %b %d %H:%M:%S %z %Y', utc=True),
        )
        .query('origin != "Unknown" and year < 2020')
        .loc[:, ['make', 'model', 'year', 'city08', 'highway08', 'origin', 'createdOn']]
        .groupby(['origin', 'year'])
        .city08
        .mean()
        .unstack('origin')
    )
    result_pandas

timing_test()



302 ms ± 6.41 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)




In [15]:
result_pl_eager.head(3)

year,Germany,Japan,South Korea,Sweden,USA
i64,f64,f64,f64,f64,f64
1984,20.06338,21.705882,,18.540541,16.458456
1985,18.175676,21.533333,,17.529412,16.576883
1986,18.072464,20.606383,24.0,19.0,16.424023


In [16]:
result_pl_lazy.head(3)

year,Germany,Japan,South Korea,Sweden,USA
i64,f64,f64,f64,f64,f64
1984,20.06338,21.705882,,18.540541,16.458456
1985,18.175676,21.533333,,17.529412,16.576883
1986,18.072464,20.606383,24.0,19.0,16.424023


In [17]:
result_pandas.head(3)

origin,Germany,Japan,South Korea,Sweden,USA
year,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
1984,20.06338,21.705882,,18.540541,16.458456
1985,18.175676,21.533333,,17.529412,16.576883
1986,18.072464,20.606383,24.0,19.0,16.424023


Rough results:

    Polars eager 	127.0 ms ± 12.1 ms per loop
    Polars lazy 	 30.5 ms ± 4.62 ms per loop
    Pandas 		289.0 ms ± 3.74 ms per loop

---
# 9.6 Viewing Plans
[back to Top](#Top)

* Examine what Polars did to make the code run faster via the query plan.
* This is a tree of operations that Polars will execute.
* View this by inspecting the lazy object.

In [18]:
df_pl_lazy = pl.scan_csv('./data/vehicles.csv', null_values='NA')

print (
    df_pl_lazy
    .with_columns(
        pl.col('createdOn')
            .str.to_datetime('%a %b %d %H:%M:%S %Z %Y'),
        origin=make_to_origin_replace(pl.col('make'))
    )
    .filter(
        (pl.col('origin')!="Unknown") & (pl.col('year')<2020)
    )
    .select([
        'make', 'model', 'year', 'city08', 'highway08', 'origin', 'createdOn'
    ])
    .group_by(['origin', 'year'])
    .agg(avg_city08=pl.col('city08').mean())
)

naive plan: (run LazyFrame.explain(optimized=True) to see the optimized plan)

AGGREGATE
	[col("city08").mean().alias("avg_city08")] BY [col("origin"), col("year")] FROM
   SELECT [col("make"), col("model"), col("year"), col("city08"), col("highway08"), col("origin"), col("createdOn")] FROM
    FILTER [(col("origin")) != (String(Unknown))] FROM
      FILTER [(col("year")) < (2020)] FROM
         WITH_COLUMNS:
         [col("createdOn").str.strptime([String(raise)]), col("make").replace_strict([Series, Series, String(Unknown)]).alias("origin")] 
          Csv SCAN [./data/vehicles.csv]
          PROJECT */84 COLUMNS


* This says it is a *naive* plan.
* It is the eager operations that would execute if we ran the code eagerly.
* We read this from the bottom up.
* The first operation is to PROJECT all columns from the CSV file.
* Then, it will make a new *origin* column (WITH_COLUMNS).
* Then it will filter out rows, select a few columns, and do the group by and mean.

---
* If we use `.explain()`, Polars will show us the optimized query plan for the entire chain of operations.

In [19]:
df_pl_lazy = pl.scan_csv('./data/vehicles.csv', null_values='NA')

print (
    df_pl_lazy
    .with_columns(
        pl.col('createdOn')
            .str.to_datetime('%a %b %d %H:%M:%S %Z %Y'),
        origin=make_to_origin_replace(pl.col('make'))
    )
    .filter(
        (pl.col('origin')!="Unknown") & (pl.col('year')<2020)
    )
    .select([
        'make', 'model', 'year', 'city08', 'highway08', 'origin', 'createdOn'
    ])
    .group_by(['origin', 'year'])
    .agg(avg_city08=pl.col('city08').mean())
    .explain()
)

AGGREGATE
	[col("city08").mean().alias("avg_city08")] BY [col("origin"), col("year")] FROM
  simple π 3/4 ["year", "city08", "origin"]
    FILTER [([(col("year")) < (2020)]) & ([(col("origin")) != (String(Unknown))])] FROM
       WITH_COLUMNS:
       [col("make").replace_strict([Series, Series, String(Unknown)]).alias("origin")] 
        Csv SCAN [./data/vehicles.csv]
        PROJECT 3/84 COLUMNS


* We see that the PROJECT says "3/84".
* This means POLARS only reads 3 columns from the CSV file.

<font size=2>
    
``` 
AGGREGATE
    [col("city08").mean().alias("avg_city08")] BY [col("origin"), col("year")] FROM
   SELECT [col("make"), col("model"), col("year"), col("city08"), col("highway08"), col("origin"), col("createdOn")] FROM
    FILTER [([(col("origin")) != (String(Unknown))]) & ([(col("year")) < (dyn int: 2020)])] FROM
     WITH_COLUMNS:
[col("createdOn").str.strptime([String(raise)]), col("make").replace([Series, Series, String(Unknown)]).alias("origin")]
        Csv SCAN ./data/vehicles.csv
        PROJECT */? COLUMNS
---
AGGREGATE
    [col("city08").mean().alias("avg_city08")] BY [col("origin"), col("year")] FROM
  SIMPLE_PROJECTION 
    FILTER [([(col("origin")) != (String(Unknown))]) & ([(col("year")) < (2020)])] FROM
     WITH_COLUMNS:
     [col("make").replace([Series, Series, String(Unknown)]).alias("origin")], [] 
        Csv SCAN ./data/vehicles.csv
        PROJECT 3/84 COLUMNS
```
</font>


# 9.7 Streaming
[back to Top](#Top)

* Polars also supports streaming data from disk.
* It reads a chunk of data from disk, processes it, then reads the next chunk.
* To use streaming, use lazy dataframes and use the `stream=True` parameter to `collect()`.
* Simulate a large file by loading the same file multiple times.

---

GB: We can create a test `vehicles_large.csv` file. If we concat itself 400x, it is 8GB.

<font size=2>

```bash
    #!/bin/bash
    
    # Set the input file
    input_file="vehicles.csv"
    
    # Set the output file
    output_file="vehicles_large.csv"
    
    # Set the number of times to concatenate
    num_times=400
    
    # Clear the output file
    > "$output_file"
    
    # Concatenate the input file to  output file n-times
    for ((i=0; i<$num_times; i++)); do
        cat "$input_file" >> "$output_file"
    done

```


In [20]:
# If read 400x in eager mode, required memory is 11.28GB
@pl.Config(set_verbose=False)
def test():
    factor=100
    df_pl_lazy = pl.scan_csv(
        ['./data/vehicles.csv']*factor,
        null_values='NA'
    ).collect()
    
    print(f"{df_pl_lazy.estimated_size('mb'):.2f}MB")
    print(f"{df_pl_lazy.estimated_size('gb'):.2f}GB")
    
test()

2933.51MB
2.86GB


In [21]:
factor=10
df_pl_lazy = pl.scan_csv(
    ['./data/vehicles.csv']*factor,
    null_values='NA'
)

query = (
    df_pl_lazy
    .with_columns(
        pl.col('createdOn').str.to_datetime('%a %b %d %H:%M:%S %Z %Y'),
        origin=make_to_origin_replace(pl.col('make')) # 54.5 ms, Polars
        # origin = pl.col('make').map_elements(make_to_origin) # 143 ms, Polars
        # origin = make_to_origin_expr('make') # 57.3 ms, Pandas
    )
    .filter(
        (pl.col("origin") != "Unknown") & (pl.col("year")<2020)
    )
    .select(['make', 'model', 'year', 'city08', 'highway08', 'origin', 'createdOn'])
    .group_by(['origin', 'year'])
    .agg(avg_city08=pl.col('city08').mean())
    .sort(['origin', 'year'])
    
    # .pivot(
    #     index='year', 
    #     on='origin', 
    #     values='avg_city08'
    # )
)

query.collect(streaming=True)

RUN STREAMING PIPELINE
[csv -> hstack -> filter -> fast_projection -> generic-group_by -> sort_multiple -> ordered_sink]
STREAMING CHUNK SIZE: 49998 rows
STREAMING CHUNK SIZE: 49998 rows
STREAMING CHUNK SIZE: 49998 rows
STREAMING CHUNK SIZE: 49998 rows
STREAMING CHUNK SIZE: 49998 rows
STREAMING CHUNK SIZE: 49998 rows
STREAMING CHUNK SIZE: 49998 rows
STREAMING CHUNK SIZE: 49998 rows
STREAMING CHUNK SIZE: 49998 rows
STREAMING CHUNK SIZE: 49998 rows
finish streaming aggregation with local in-memory table


origin,year,avg_city08
str,i64,f64
"""Germany""",1984,20.06338
"""Germany""",1985,18.175676
"""Germany""",1986,18.072464
"""Germany""",1987,17.724138
"""Germany""",1988,16.5
…,…,…
"""USA""",2015,21.473016
"""USA""",2016,24.283951
"""USA""",2017,23.414634
"""USA""",2018,22.990769



If you want to use `.pivot()` on a lazy dataframe, maybe run `pivot()` after the lazy dataframe has been run, then chain `.pivot()` at the very end.

In [22]:
(
    query
    .collect(streaming=True)
    .pivot(
        index='year', 
        on='origin', 
        values='avg_city08')
)

RUN STREAMING PIPELINE
[csv -> hstack -> filter -> fast_projection -> generic-group_by -> sort_multiple -> ordered_sink]
STREAMING CHUNK SIZE: 49998 rows
STREAMING CHUNK SIZE: 49998 rows
STREAMING CHUNK SIZE: 49998 rows
STREAMING CHUNK SIZE: 49998 rows
STREAMING CHUNK SIZE: 49998 rows
STREAMING CHUNK SIZE: 49998 rows
STREAMING CHUNK SIZE: 49998 rows
STREAMING CHUNK SIZE: 49998 rows
STREAMING CHUNK SIZE: 49998 rows
STREAMING CHUNK SIZE: 49998 rows
finish streaming aggregation with local in-memory table


year,Germany,Japan,South Korea,Sweden,USA
i64,f64,f64,f64,f64,f64
1984,20.06338,21.705882,,18.540541,16.458456
1985,18.175676,21.533333,,17.529412,16.576883
1986,18.072464,20.606383,24.0,19.0,16.424023
1987,17.724138,20.085202,24.0,17.5625,16.042879
1988,16.5,19.919283,23.333333,17.714286,16.085179
…,…,…,…,…,…
2015,20.730556,23.891192,22.75,20.666667,21.473016
2016,20.75,26.324022,23.711111,21.111111,24.283951
2017,21.09375,26.353488,29.295455,22.272727,23.414634
2018,20.767045,26.106481,29.674419,22.740741,22.990769



* Assign a lazy dataframe to a variable (eg `query`) for easier inspection later.
* https://www.rhosignal.com/posts/streaming-in-polars/

In [23]:
print(query.explain(streaming=False))

SORT BY [col("origin"), col("year")]
  AGGREGATE
  	[col("city08").mean().alias("avg_city08")] BY [col("origin"), col("year")] FROM
    simple π 3/4 ["year", "city08", "origin"]
      FILTER [([(col("year")) < (2020)]) & ([(col("origin")) != (String(Unknown))])] FROM
         WITH_COLUMNS:
         [col("make").replace_strict([Series, Series, String(Unknown)]).alias("origin")] 
          Csv SCAN [./data/vehicles.csv, ... 9 other sources]
          PROJECT 3/84 COLUMNS


In [24]:
# Run parts of the query in a streaming fashion (this is in an alpha state)
# https://docs.pola.rs/py-polars/html/reference/lazyframe/api/polars.LazyFrame.explain.html
print(query.explain(streaming=True))

STREAMING:
  SORT BY [col("origin"), col("year")]
    AGGREGATE
    	[col("city08").mean().alias("avg_city08")] BY [col("origin"), col("year")] FROM
      simple π 3/4 ["year", "city08", "origin"]
        FILTER [([(col("year")) < (2020)]) & ([(col("origin")) != (String(Unknown))])] FROM
           WITH_COLUMNS:
           [col("make").replace_strict([Series, Series, String(Unknown)]).alias("origin")] 
            Csv SCAN [./data/vehicles.csv, ... 9 other sources]
            PROJECT 3/84 COLUMNS


---

 **How is the chunk size calculated?**

A key parameter affecting performance in streaming mode is the chunk size. The chunk size is the number of rows that Polars processes in each batch. At the top of the streaming query plan we see the chunk size is 12500 rows. This value is set by Polars based on:

    1. the number of threads Polars will use
    2. the number of columns in the dataset


* https://www.rhosignal.com/posts/streaming-chunk-sizes/#how-is-the-chunk-size-calculated

* How many threads?

In [25]:
# maximum number of threads Polars will use with the 
pl.thread_pool_size()

4