# outline
- reading csvs
- shrinking data
- writing paruqe files
- cloud

# imports

In [54]:
import timeit
import polars as pl
from polars import col as c # to be used instead of pl.col(...): df.filter(c("column_name")...)
import pyarrow.dataset as ds
import os

# get data from https://www.kaggle.com/datasets/robikscube/flight-delay-dataset-20182022

# Reading multiple csvs
- reminder: streaming is not magic.
- It's just a way to read the data in chunks, so you can process it in smaller parts.
- It's useful when you have a large dataset and you don't want to load it all at once in memory.
- otherwise it's just slower

### compare reading csvs time

In [2]:
# DO NOT RUN THIS CELL UNLESS YOU WANT TO WAIT FOR ~2 MINUTES
path = 'archive/Combined_Flights_*.csv'
dtypes={"DivAirportLandings": pl.Float64} # last file has a column with different types

def read_csv():
    return pl.read_csv(path ,dtypes=dtypes)
def scan_csv_non_streaming():
    return pl.scan_csv(path, dtypes=dtypes).collect()
def scan_csv_streaming():
    return pl.scan_csv(path ,dtypes=dtypes).collect(streaming=True) # theres no point in streaming if there's no filtering and the whole dataset fits into memory

read_csv_time = round(timeit.timeit(read_csv, number=5))
scan_csv_non_streaming_time = round(timeit.timeit(scan_csv_non_streaming, number=5))
scan_csv_streaming_time = round(timeit.timeit(scan_csv_streaming, number=5))


print(f"simple_read_csv: {read_csv_time} seconds")
print(f"scan_csv_non_streaming: {scan_csv_non_streaming_time} seconds")
print(f"scan_csv_streaming: {scan_csv_streaming_time} seconds")

simple_read_csv: 25 seconds
scan_csv_non_streaming: 25 seconds
scan_csv_streaming: 62 seconds


- simple_read_csv: 25 seconds
- scan_csv_non_streaming: 25 seconds
- scan_csv_streaming: 62 seconds

## cores and chunk size

In [7]:
scanned_df = pl.scan_csv(path)
scanned_df

In [2]:
# number of threads polars is using
pl.thread_pool_size()

10

- A reasonable use case for changing the thread_pool_size might be temporarily limiting the number of threads before importing Polars in a PySpark UDF or similar context.
- Otherwise, it is strongly recommended not to override this value as it will be set automatically by the engine.

In [None]:
# restart kernel
import os
os.environ["POLARS_MAX_THREADS"] = "1"
import polars as pl                    # polars should be imported after setting the environment variable
import timeit

print("pl.thread_pool_size():", pl.thread_pool_size())

In [1]:
def scan_csv_non_streaming():
    return pl.scan_csv(path, dtypes=dtypes).collect()

scan_csv_non_streaming_time = round(timeit.timeit(scan_csv_non_streaming, number=5))
print(f"scan_csv_non_streaming: {scan_csv_non_streaming_time} seconds")

## changing chunk size
[current formula for chunk size](https://github.com/pola-rs/polars/blob/3d7a5d92964b8c65c3c4e834e668b980845acb6d/polars/polars-lazy/polars-pipe/src/pipeline/mod.rs)

``` python
thread_factor = max(12 / n_threads, 1)
STREAMING_CHUNK_SIZE = max(50_000 / n_cols * thread_factor, 1000)
```
* For some datasets (esp. when you have large string elements) this can be too optimistic and lead to Out of Memory errors.

changing chunk size:
``` python
pl.Config.set_streaming_chunk_size(42000)
```


## playing with parquet

In [2]:
# reminder: restart kernel to reverse the thread_pool_size change
import polars as pl
df = pl.read_csv(path, dtypes=dtypes)
df.head(3)

FlightDate,Airline,Origin,Dest,Cancelled,Diverted,CRSDepTime,DepTime,DepDelayMinutes,DepDelay,ArrTime,ArrDelayMinutes,AirTime,CRSElapsedTime,ActualElapsedTime,Distance,Year,Quarter,Month,DayofMonth,DayOfWeek,Marketing_Airline_Network,Operated_or_Branded_Code_Share_Partners,DOT_ID_Marketing_Airline,IATA_Code_Marketing_Airline,Flight_Number_Marketing_Airline,Operating_Airline,DOT_ID_Operating_Airline,IATA_Code_Operating_Airline,Tail_Number,Flight_Number_Operating_Airline,OriginAirportID,OriginAirportSeqID,OriginCityMarketID,OriginCityName,OriginState,OriginStateFips,OriginStateName,OriginWac,DestAirportID,DestAirportSeqID,DestCityMarketID,DestCityName,DestState,DestStateFips,DestStateName,DestWac,DepDel15,DepartureDelayGroups,DepTimeBlk,TaxiOut,WheelsOff,WheelsOn,TaxiIn,CRSArrTime,ArrDelay,ArrDel15,ArrivalDelayGroups,ArrTimeBlk,DistanceGroup,DivAirportLandings
str,str,str,str,bool,bool,i64,f64,f64,f64,f64,f64,f64,f64,f64,f64,i64,i64,i64,i64,i64,str,str,i64,str,i64,str,i64,str,str,i64,i64,i64,i64,str,str,i64,str,i64,i64,i64,i64,str,str,i64,str,i64,f64,f64,str,f64,f64,f64,f64,i64,f64,f64,f64,str,i64,f64
"""2018-01-23""","""Endeavor Air I…","""ABY""","""ATL""",False,False,1202,1157.0,0.0,-5.0,1256.0,0.0,38.0,62.0,59.0,145.0,2018,1,1,23,2,"""DL""","""DL_CODESHARE""",19790,"""DL""",3298,"""9E""",20363,"""9E""","""N8928A""",3298,10146,1014602,30146,"""Albany, GA""","""GA""",13,"""Georgia""",34,10397,1039707,30397,"""Atlanta, GA""","""GA""",13,"""Georgia""",34,0.0,-1.0,"""1200-1259""",14.0,1211.0,1249.0,7.0,1304,-8.0,0.0,-1.0,"""1300-1359""",1,0.0
"""2018-01-24""","""Endeavor Air I…","""ABY""","""ATL""",False,False,1202,1157.0,0.0,-5.0,1258.0,0.0,36.0,62.0,61.0,145.0,2018,1,1,24,3,"""DL""","""DL_CODESHARE""",19790,"""DL""",3298,"""9E""",20363,"""9E""","""N800AY""",3298,10146,1014602,30146,"""Albany, GA""","""GA""",13,"""Georgia""",34,10397,1039707,30397,"""Atlanta, GA""","""GA""",13,"""Georgia""",34,0.0,-1.0,"""1200-1259""",13.0,1210.0,1246.0,12.0,1304,-6.0,0.0,-1.0,"""1300-1359""",1,0.0
"""2018-01-25""","""Endeavor Air I…","""ABY""","""ATL""",False,False,1202,1153.0,0.0,-9.0,1302.0,0.0,40.0,62.0,69.0,145.0,2018,1,1,25,4,"""DL""","""DL_CODESHARE""",19790,"""DL""",3298,"""9E""",20363,"""9E""","""N8836A""",3298,10146,1014602,30146,"""Albany, GA""","""GA""",13,"""Georgia""",34,10397,1039707,30397,"""Atlanta, GA""","""GA""",13,"""Georgia""",34,0.0,-1.0,"""1200-1259""",18.0,1211.0,1251.0,11.0,1304,-2.0,0.0,-1.0,"""1300-1359""",1,0.0


notice inefficient types (f64,i64)

In [2]:
df.estimated_size(unit='gb')

8.848384218290448

files' size on disk: 7.37 GB

In [15]:
# naive parquetization
df.write_parquet('archive/raw_Combined_Flights.parquet')

# check file size
raw_parquet_size = os.path.getsize('archive/raw_Combined_Flights.parquet')/(1024**3)
raw_parquet_size # Gigabytes

0.7786195445805788

### compare reading parquet times

**pl.scan_pyarrow_dataset**
- considered unstable (may be changed).
- can be useful to connect to cloud or partitioned datasets.
- can only can push down predicates that are allowed by PyArrow (e.g. not the full Polars API).
- If scan_parquet() works for your source, you should use that instead.

In [2]:
# DO NOT RUN THIS CELL UNLESS YOU WANT TO WAIT FOR ~2 MINUTES

def read_parquet():
    return pl.read_parquet('archive/raw_Combined_Flights.parquet')
def read_parquet_pyarrow():
    return pl.read_parquet('archive/raw_Combined_Flights.parquet', use_pyarrow=True)
def scan_parquet():
    return pl.scan_parquet('archive/raw_Combined_Flights.parquet').collect()
def scan_pyarrow_dataset():
    return pl.scan_pyarrow_dataset(ds.dataset("archive/raw_Combined_Flights.parquet")).collect()


read_parquet_time = round(timeit.timeit(read_parquet, number=5))
read_parquet_pyarrow_time = round(timeit.timeit(read_parquet_pyarrow, number=5))
scan_parquet_time = round(timeit.timeit(scan_parquet, number=5))
scan_pyarrow_dataset_time = round(timeit.timeit(scan_pyarrow_dataset, number=5))

print(f"simple_read_parquet: {read_parquet_time} seconds")
print(f"read_parque with pyarrow: {read_parquet_pyarrow_time} seconds")
print(f"scan_parquet: {scan_parquet_time} seconds")
print(f"scan_pyarrow_dataset: {scan_pyarrow_dataset_time} seconds")

simple_read_parquet: 16 seconds
read_parque with pyarrow: 39 seconds
scan_parquet: 8 seconds
scan_pyarrow_dataset: 43 seconds


- simple_read_parquet: 16 seconds
- read_parque with pyarrow: 39 seconds
- scan_parquet: 8 seconds
- scan_pyarrow_dataset: 43 seconds

In [59]:
df = pl.scan_parquet('archive/raw_Combined_Flights.parquet').collect()
df.estimated_size(unit='gb')

8.847930748015642

# Shrinking the dataset

In [60]:
%%timeit
df.with_columns(c("DepTime")+c("ArrTime")-c("ArrDelayMinutes")+c("DepDelayMinutes")-c("CRSElapsedTime"))

21.9 ms ± 5.06 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Numerical dtypes

In [61]:
# automatic reduction of numeric columns to the minimum type.
auto_shrink= df.select(pl.all().shrink_dtype())
auto_shrink.head(3)

FlightDate,Airline,Origin,Dest,Cancelled,Diverted,CRSDepTime,DepTime,DepDelayMinutes,DepDelay,ArrTime,ArrDelayMinutes,AirTime,CRSElapsedTime,ActualElapsedTime,Distance,Year,Quarter,Month,DayofMonth,DayOfWeek,Marketing_Airline_Network,Operated_or_Branded_Code_Share_Partners,DOT_ID_Marketing_Airline,IATA_Code_Marketing_Airline,Flight_Number_Marketing_Airline,Operating_Airline,DOT_ID_Operating_Airline,IATA_Code_Operating_Airline,Tail_Number,Flight_Number_Operating_Airline,OriginAirportID,OriginAirportSeqID,OriginCityMarketID,OriginCityName,OriginState,OriginStateFips,OriginStateName,OriginWac,DestAirportID,DestAirportSeqID,DestCityMarketID,DestCityName,DestState,DestStateFips,DestStateName,DestWac,DepDel15,DepartureDelayGroups,DepTimeBlk,TaxiOut,WheelsOff,WheelsOn,TaxiIn,CRSArrTime,ArrDelay,ArrDel15,ArrivalDelayGroups,ArrTimeBlk,DistanceGroup,DivAirportLandings
str,str,str,str,bool,bool,i16,f32,f32,f32,f32,f32,f32,f32,f32,f32,i16,i8,i8,i8,i8,str,str,i16,str,i16,str,i16,str,str,i16,i16,i32,i32,str,str,i8,str,i8,i16,i32,i32,str,str,i8,str,i8,f32,f32,str,f32,f32,f32,f32,i16,f32,f32,f32,str,i8,f32
"""2018-01-23""","""Endeavor Air I…","""ABY""","""ATL""",False,False,1202,1157.0,0.0,-5.0,1256.0,0.0,38.0,62.0,59.0,145.0,2018,1,1,23,2,"""DL""","""DL_CODESHARE""",19790,"""DL""",3298,"""9E""",20363,"""9E""","""N8928A""",3298,10146,1014602,30146,"""Albany, GA""","""GA""",13,"""Georgia""",34,10397,1039707,30397,"""Atlanta, GA""","""GA""",13,"""Georgia""",34,0.0,-1.0,"""1200-1259""",14.0,1211.0,1249.0,7.0,1304,-8.0,0.0,-1.0,"""1300-1359""",1,0.0
"""2018-01-24""","""Endeavor Air I…","""ABY""","""ATL""",False,False,1202,1157.0,0.0,-5.0,1258.0,0.0,36.0,62.0,61.0,145.0,2018,1,1,24,3,"""DL""","""DL_CODESHARE""",19790,"""DL""",3298,"""9E""",20363,"""9E""","""N800AY""",3298,10146,1014602,30146,"""Albany, GA""","""GA""",13,"""Georgia""",34,10397,1039707,30397,"""Atlanta, GA""","""GA""",13,"""Georgia""",34,0.0,-1.0,"""1200-1259""",13.0,1210.0,1246.0,12.0,1304,-6.0,0.0,-1.0,"""1300-1359""",1,0.0
"""2018-01-25""","""Endeavor Air I…","""ABY""","""ATL""",False,False,1202,1153.0,0.0,-9.0,1302.0,0.0,40.0,62.0,69.0,145.0,2018,1,1,25,4,"""DL""","""DL_CODESHARE""",19790,"""DL""",3298,"""9E""",20363,"""9E""","""N8836A""",3298,10146,1014602,30146,"""Albany, GA""","""GA""",13,"""Georgia""",34,10397,1039707,30397,"""Atlanta, GA""","""GA""",13,"""Georgia""",34,0.0,-1.0,"""1200-1259""",18.0,1211.0,1251.0,11.0,1304,-2.0,0.0,-1.0,"""1300-1359""",1,0.0


In [4]:
auto_shrink.estimated_size(unit='gb')

4.740936428308487

In [63]:
# more efficient data types are... more efficient
%%timeit
auto_shrink.with_columns(c("DepTime")+c("ArrTime")-c("ArrDelayMinutes")+c("DepDelayMinutes")-c("CRSElapsedTime"))

9.45 ms ± 289 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


*"I would caution you to be careful about this method. I prefer artisanal
column cleanup. Tools that do everything automatically seem to work well
80% of the time but cause issues at other times. For example, this does not
use unsigned integers."*

Matt Harrison, Effective polars

In [5]:
types = {
    'uint8': (0, 255),
    'uint16': (0, 65535),
    'uint32': (0, 4294967295),
    'uint64': (0, 18446744073709551615),
    'int8': (-128, 127),
    'int16': (-32768, 32767),
    'int32': (-2147483648, 2147483647),
    'int64': (-9223372036854775808, 9223372036854775807),
    'float32': (-3.4028235e+38, 3.4028235e+38),
    'float64': (-1.7976931348623157e+308, 1.7976931348623157e+308)
}

In [6]:
new_schema = {}
for col in df.columns:
    if df[col].dtype in pl.INTEGER_DTYPES:
        if df[col].min() >= 0:
            if df[col].max() < 255:
                new_schema[col] = pl.UInt8
            elif df[col].max() < 65535:
                new_schema[col] = pl.UInt16
            elif df[col].max() < 4294967295:
                new_schema[col] = pl.UInt32
            else:
                new_schema[col] = pl.UInt64
        else:
            if df[col].min() >= -128 and df[col].max() < 127:
                new_schema[col] = pl.Int8
            elif df[col].min() >= -32768 and df[col].max() < 32767:
                new_schema[col] = pl.Int16
            elif df[col].min() >= -2147483648 and df[col].max() < 2147483647:
                new_schema[col] = pl.Int32
            else:
                new_schema[col] = pl.Int64
    elif df[col].dtype in pl.FLOAT_DTYPES:
        if df[col].min() >= -3.4028235e+38 and  df[col].max() < 3.4028235e+38:
            new_schema[col] = pl.Float32
        else:
            new_schema[col] = pl.Float64
    else:
        new_schema[col] = df[col].dtype
new_schema

{'FlightDate': String,
 'Airline': String,
 'Origin': String,
 'Dest': String,
 'Cancelled': Boolean,
 'Diverted': Boolean,
 'CRSDepTime': UInt16,
 'DepTime': Float32,
 'DepDelayMinutes': Float32,
 'DepDelay': Float32,
 'ArrTime': Float32,
 'ArrDelayMinutes': Float32,
 'AirTime': Float32,
 'CRSElapsedTime': Float32,
 'ActualElapsedTime': Float32,
 'Distance': Float32,
 'Year': UInt16,
 'Quarter': UInt8,
 'Month': UInt8,
 'DayofMonth': UInt8,
 'DayOfWeek': UInt8,
 'Marketing_Airline_Network': String,
 'Operated_or_Branded_Code_Share_Partners': String,
 'DOT_ID_Marketing_Airline': UInt16,
 'IATA_Code_Marketing_Airline': String,
 'Flight_Number_Marketing_Airline': UInt16,
 'Operating_Airline': String,
 'DOT_ID_Operating_Airline': UInt16,
 'IATA_Code_Operating_Airline': String,
 'Tail_Number': String,
 'Flight_Number_Operating_Airline': UInt16,
 'OriginAirportID': UInt16,
 'OriginAirportSeqID': UInt32,
 'OriginCityMarketID': UInt16,
 'OriginCityName': String,
 'OriginState': String,
 'Orig

In [7]:
import polars as pl
df = pl.read_csv(path, dtypes=new_schema)
df.estimated_size(unit='gb')

4.662324987351894

### categorical and enum dtypes

In [8]:
string_df = df.select(c(pl.String))
for col in string_df.columns:
    print(f"{col}: {string_df[col].n_unique()}.-----------------------------examples: {string_df[col].unique().head(3).to_list()}")

FlightDate: 1308.-----------------------------examples: ['2020-12-30', '2020-10-26', '2020-04-17']
Airline: 28.-----------------------------examples: ['Peninsula Airways Inc.', 'American Airlines Inc.', 'Horizon Air']
Origin: 388.-----------------------------examples: ['FCA', 'WRG', 'SBN']
Dest: 388.-----------------------------examples: ['MRY', 'PHF', 'GPT']
Marketing_Airline_Network: 11.-----------------------------examples: ['NK', 'G4', 'F9']
Operated_or_Branded_Code_Share_Partners: 16.-----------------------------examples: ['WN', 'B6', 'AS_CODESHARE']
IATA_Code_Marketing_Airline: 11.-----------------------------examples: ['VX', 'G4', 'AS']
Operating_Airline: 28.-----------------------------examples: ['HA', 'CP', 'AA']
IATA_Code_Operating_Airline: 28.-----------------------------examples: ['PT', 'G7', 'EM']
Tail_Number: 7084.-----------------------------examples: ['N918AK', 'N490PX', 'N840DN']
OriginCityName: 381.-----------------------------examples: ['Hagerstown, MD', 'Laredo, TX'

Enum vs Categorical
* Polars supports two different DataTypes for working with categorical data: Enum and Categorical.
* When the categories are known up front use Enum. 
* When you don't know the categories or they are not fixed then you use Categorical.
* In case your requirements change along the way you can always cast from one to the other.

In [9]:
string_schema = {"FlightDate": pl.Date,
    "Airline": pl.Categorical,
    "Origin": pl.Categorical,
    "Dest": pl.Categorical,
    "Marketing_Airline_Network": pl.Categorical,
    "Operated_or_Branded_Code_Share_Partners": pl.Categorical,
    "IATA_Code_Marketing_Airline": pl.Categorical,
    "Operating_Airline": pl.Categorical,
    "IATA_Code_Operating_Airline": pl.Categorical,
    "Tail_Number": pl.Categorical,
    "OriginCityName": pl.Categorical,
    "OriginState": pl.Categorical,
    "OriginStateName": pl.Categorical,
    "DestCityName": pl.Categorical,
    "DestState": pl.Categorical,
    "DestStateName": pl.Categorical,
    "DepTimeBlk": pl.String,
    "ArrTimeBlk": pl.String,
}
new_schema = {'FlightDate': pl.String,
 'Airline': pl.String,
 'Origin': pl.String,
 'Dest': pl.String,
 'Cancelled': pl.Boolean,
 'Diverted': pl.Boolean,
 'CRSDepTime': pl.UInt16,
 'DepTime': pl.Float32,
 'DepDelayMinutes': pl.Float32,
 'DepDelay': pl.Float32,
 'ArrTime': pl.Float32,
 'ArrDelayMinutes': pl.Float32,
 'AirTime': pl.Float32,
 'CRSElapsedTime': pl.Float32,
 'ActualElapsedTime': pl.Float32,
 'Distance': pl.Float32,
 'Year': pl.UInt16,
 'Quarter': pl.UInt8,
 'Month': pl.UInt8,
 'DayofMonth': pl.UInt8,
 'DayOfWeek': pl.UInt8,
 'Marketing_Airline_Network': pl.String,
 'Operated_or_Branded_Code_Share_Partners': pl.String,
 'DOT_ID_Marketing_Airline': pl.UInt16,
 'IATA_Code_Marketing_Airline': pl.String,
 'Flight_Number_Marketing_Airline': pl.UInt16,
 'Operating_Airline': pl.String,
 'DOT_ID_Operating_Airline': pl.UInt16,
 'IATA_Code_Operating_Airline': pl.String,
 'Tail_Number': pl.String,
 'Flight_Number_Operating_Airline': pl.UInt16,
 'OriginAirportID': pl.UInt16,
 'OriginAirportSeqID': pl.UInt32,
 'OriginCityMarketID': pl.UInt16,
 'OriginCityName': pl.String,
 'OriginState': pl.String,
 'OriginStateFips': pl.UInt8,
 'OriginStateName': pl.String,
 'OriginWac': pl.UInt8,
 'DestAirportID': pl.UInt16,
 'DestAirportSeqID': pl.UInt32,
 'DestCityMarketID': pl.UInt16,
 'DestCityName': pl.String,
 'DestState': pl.String,
 'DestStateFips': pl.UInt8,
 'DestStateName': pl.String,
 'DestWac': pl.UInt8,
 'DepDel15': pl.Float32,
 'DepartureDelayGroups': pl.Float32,
 'DepTimeBlk': pl.String,
 'TaxiOut': pl.Float32,
 'WheelsOff': pl.Float32,
 'WheelsOn': pl.Float32,
 'TaxiIn': pl.Float32,
 'CRSArrTime': pl.UInt16,
 'ArrDelay': pl.Float32,
 'ArrDel15': pl.Float32,
 'ArrivalDelayGroups': pl.Float32,
 'ArrTimeBlk': pl.String,
 'DistanceGroup': pl.UInt8,
 'DivAirportLandings': pl.Float32}

In [10]:
# error
final_schema = {**new_schema, **string_schema}
df = pl.read_csv(path, dtypes=final_schema)
df.estimated_size(unit='gb')

StringCacheMismatchError: cannot compare categoricals coming from different sources, consider setting a global StringCache.

Help: if you're using Python, this may look something like:

    with pl.StringCache():
        # Initialize Categoricals.
        df1 = pl.DataFrame({'a': ['1', '2']}, schema={'a': pl.Categorical})
        df2 = pl.DataFrame({'a': ['1', '3']}, schema={'a': pl.Categorical})
    # Your operations go here.
    pl.concat([df1, df2])

Alternatively, if the performance cost is acceptable, you could just set:

    import polars as pl
    pl.enable_string_cache()

on startup.

* [enable_string_cache](https://docs.pola.rs/py-polars/html/reference/api/polars.enable_string_cache.html)
* [StringCache](https://docs.pola.rs/py-polars/html/reference/api/polars.StringCache.html#polars.StringCache)

In [56]:
schema = {**new_schema, **string_schema}
source='archive/Combined_Flights_*.csv'
destination='archive/Combined_Flights.parquet'
with pl.StringCache():
       df = pl.scan_csv(source, dtypes=schema).collect()

In [58]:
# see the mapping of categorical column
print(df['Airline'].sample(10, seed=1).to_physical())
print(df['Airline'].sample(10, seed=1))


shape: (10,)
Series: 'Airline' [u32]
[
	12
	19
	19
	0
	0
	19
	13
	20
	19
	18
]
shape: (10,)
Series: 'Airline' [cat]
[
	"Frontier Airlines Inc."
	"Southwest Airlines Co."
	"Southwest Airlines Co."
	"American Airlines Inc."
	"American Airlines Inc."
	"Southwest Airlines Co."
	"Envoy Air"
	"Mesa Airlines Inc."
	"Southwest Airlines Co."
	"SkyWest Airlines Inc."
]


## pl.scan_csv().sink_parquet()
##            vs
## pl.scan_csv().collect().write_parquet()

- sink_parquet(): Evaluate the query in streaming mode and write to a Parquet file.

In [29]:
# DO NOT RUN THIS CELL UNLESS YOU WANT TO WAIT FOR ~30 MINUTES

schema = {**new_schema, **string_schema}
source='archive/Combined_Flights_*.csv'
destination='archive/Combined_Flights.parquet'

def basic_manipulation(source, schema):
     with pl.StringCache():
       return (pl.scan_csv(source, dtypes=schema)
        .with_columns(
            c("DepTimeBlk").str.split_exact("-",1).struct.rename_fields(["DepTimeBlkStart", "DepTimeBlkEnd"]),
            c("ArrTimeBlk").str.split_exact("-",1).struct.rename_fields(["ArrTimeBlkStart", "ArrTimeBlkEnd"])
        )
        .unnest("DepTimeBlk", "ArrTimeBlk")
        .cast({"DepTimeBlkStart": pl.Time, "DepTimeBlkEnd": pl.Time, "ArrTimeBlkStart": pl.Time, "ArrTimeBlkEnd": pl.Time})
       )

def csv2parquet_sink(source, schema, destination):
        basic_manipulation(source, schema).sink_parquet(destination)  # <<<<<<<<<<<<<<<<<<<<<<<<<< .sink_parquet()

def csv2parquet_collect_write_parquet(source, schema, destination):
    basic_manipulation(source, schema).collect().write_parquet(destination)  # <<<<<<<<<<<<<<<<<<< .collect().write_parquet()

def csv2parquet_collect_write_parquet_pyarrow(source, schema, destination):
    basic_manipulation(source, schema).collect().write_parquet(destination, use_pyarrow=True) # << .collect().write_parquet(pyause_pyarrowrrow=True)


csv2parquet_sink_time = round(timeit.timeit(csv2parquet_sink, number=3))
csv2parquet_collect_write_parquet_time = round(timeit.timeit(csv2parquet_collect_write_parquet, number=3))
csv2parquet_collect_write_parquet_pyarrow_time = round(timeit.timeit(csv2parquet_collect_write_parquet_pyarrow, number=3))

print(f"csv2parquet_sink: {csv2parquet_sink_time} seconds")
print(f"csv2parquet_collect_write_parquet: {csv2parquet_collect_write_parquet_time} seconds")
print(f"csv2parquet_collect_write_parquet_pyarrow: {csv2parquet_collect_write_parquet_pyarrow_time} seconds")



csv2parquet_sink: 1721 seconds
csv2parquet_collect_write_parquet: 126 seconds


- csv2parquet_sink: 1721 seconds
- csv2parquet_collect_write_parquet: 126 seconds
- csv2parquet_collect_write_parquet_pyarrow: 142 seconds

In [38]:
shrinked_parquet_size = os.path.getsize('archive/shrinked_Combined_Flights.parquet')/(1024**3)
print(f"raw_parquet_size on disk: {raw_parquet_size}, shrinked_parquet_size on disk: {shrinked_parquet_size}")
read_raw = pl.scan_parquet('archive/raw_Combined_Flights.parquet').collect()
read_shrinked = pl.scan_parquet('archive/shrinked_Combined_Flights.parquet').collect()
print(f"loaded_raw_df: {read_raw.estimated_size(unit='gb')}, loaded_shrinked_df: {read_shrinked.estimated_size(unit='gb')}") 


raw_parquet_size on disk: 0.7786195445805788, shrinked_parquet_size on disk: 0.7581249624490738
loaded_raw_df: 8.847930748015642, loaded_shrinked_df: 4.193919369019568


It's possible that the reduction in memory usage due to shrinking data types is being offset by metadata overhead or additional encoding required for the compressed data. Another option is that the encoding is as efficient as type shrinking.

## compression


- compression: {‘lz4’, ‘uncompressed’, ‘snappy’, ‘gzip’, ‘lzo’, ‘brotli’, ‘zstd’}

    - Choose “zstd” for good compression performance.
    - Choose “lz4” for fast compression/decompression.
    - Choose “snappy” for more backwards compatibility guarantees when you deal with older parquet readers.

- compression_level
    The level of compression to use. Higher compression means smaller files on disk.
    - “gzip” : min-level: 0, max-level: 10.
    - “brotli” : min-level: 0, max-level: 11.
    - “zstd” : min-level: 1, max-level: 22.



# partitioning

- Improve query performance
- Data Organization and Management
- Easier maintenance operations
- Parallel Processing
- scalability by distributing data across multiple nodes

In [39]:
# not the way to create partitions
partitions = df.partition_by('Marketing_Airline_Network')
partitions[0].head(3)

FlightDate,Airline,Origin,Dest,Cancelled,Diverted,CRSDepTime,DepTime,DepDelayMinutes,DepDelay,ArrTime,ArrDelayMinutes,AirTime,CRSElapsedTime,ActualElapsedTime,Distance,Year,Quarter,Month,DayofMonth,DayOfWeek,Marketing_Airline_Network,Operated_or_Branded_Code_Share_Partners,DOT_ID_Marketing_Airline,IATA_Code_Marketing_Airline,Flight_Number_Marketing_Airline,Operating_Airline,DOT_ID_Operating_Airline,IATA_Code_Operating_Airline,Tail_Number,Flight_Number_Operating_Airline,OriginAirportID,OriginAirportSeqID,OriginCityMarketID,OriginCityName,OriginState,OriginStateFips,OriginStateName,OriginWac,DestAirportID,DestAirportSeqID,DestCityMarketID,DestCityName,DestState,DestStateFips,DestStateName,DestWac,DepDel15,DepartureDelayGroups,DepTimeBlkStart,DepTimeBlkEnd,TaxiOut,WheelsOff,WheelsOn,TaxiIn,CRSArrTime,ArrDelay,ArrDel15,ArrivalDelayGroups,ArrTimeBlkStart,ArrTimeBlkEnd,DistanceGroup,DivAirportLandings
date,cat,cat,cat,bool,bool,u16,f32,f32,f32,f32,f32,f32,f32,f32,f32,u16,u8,u8,u8,u8,cat,cat,u16,cat,u16,cat,u16,cat,cat,u16,u16,u32,u16,cat,cat,u8,cat,u8,u16,u32,u16,cat,cat,u8,cat,u8,f32,f32,time,time,f32,f32,f32,f32,u16,f32,f32,f32,time,time,u8,f32
2018-01-23,"""Endeavor Air I…","""ABY""","""ATL""",False,False,1202,1157.0,0.0,-5.0,1256.0,0.0,38.0,62.0,59.0,145.0,2018,1,1,23,2,"""DL""","""DL_CODESHARE""",19790,"""DL""",3298,"""9E""",20363,"""9E""","""N8928A""",3298,10146,1014602,30146,"""Albany, GA""","""GA""",13,"""Georgia""",34,10397,1039707,30397,"""Atlanta, GA""","""GA""",13,"""Georgia""",34,0.0,-1.0,00:00:00.000001200,00:00:00.000001259,14.0,1211.0,1249.0,7.0,1304,-8.0,0.0,-1.0,00:00:00.000001300,00:00:00.000001359,1,0.0
2018-01-24,"""Endeavor Air I…","""ABY""","""ATL""",False,False,1202,1157.0,0.0,-5.0,1258.0,0.0,36.0,62.0,61.0,145.0,2018,1,1,24,3,"""DL""","""DL_CODESHARE""",19790,"""DL""",3298,"""9E""",20363,"""9E""","""N800AY""",3298,10146,1014602,30146,"""Albany, GA""","""GA""",13,"""Georgia""",34,10397,1039707,30397,"""Atlanta, GA""","""GA""",13,"""Georgia""",34,0.0,-1.0,00:00:00.000001200,00:00:00.000001259,13.0,1210.0,1246.0,12.0,1304,-6.0,0.0,-1.0,00:00:00.000001300,00:00:00.000001359,1,0.0
2018-01-25,"""Endeavor Air I…","""ABY""","""ATL""",False,False,1202,1153.0,0.0,-9.0,1302.0,0.0,40.0,62.0,69.0,145.0,2018,1,1,25,4,"""DL""","""DL_CODESHARE""",19790,"""DL""",3298,"""9E""",20363,"""9E""","""N8836A""",3298,10146,1014602,30146,"""Albany, GA""","""GA""",13,"""Georgia""",34,10397,1039707,30397,"""Atlanta, GA""","""GA""",13,"""Georgia""",34,0.0,-1.0,00:00:00.000001200,00:00:00.000001259,18.0,1211.0,1251.0,11.0,1304,-2.0,0.0,-1.0,00:00:00.000001300,00:00:00.000001359,1,0.0


## write partitioned parquet

In [32]:
import pyarrow.dataset as ds
ds.write_dataset(
    read_shrinked.to_arrow(),
    "/Users/igormintz/Documents/polars_parquet/hive_1",
    format="parquet",
    partitioning=["Marketing_Airline_Network"],
    partitioning_flavor="hive",
    existing_data_behavior="overwrite_or_ignore",
)

## write partitioned parquet (nested)

In [33]:
ds.write_dataset(
    read_shrinked.to_arrow(),
    "/Users/igormintz/Documents/polars_parquet/hive_2",
    format="parquet",
    partitioning=["Marketing_Airline_Network", "Airline"],
    partitioning_flavor="hive",
    existing_data_behavior="overwrite_or_ignore",
)

## read partitioned parquet

In [46]:
(
    pl.scan_parquet("/Users/igormintz/Documents/polars_parquet/hive_1/*/*.parquet")
    .filter(pl.col("Marketing_Airline_Network") == "AA")
    .collect()
    .head(3)
)


FlightDate,Airline,Origin,Dest,Cancelled,Diverted,CRSDepTime,DepTime,DepDelayMinutes,DepDelay,ArrTime,ArrDelayMinutes,AirTime,CRSElapsedTime,ActualElapsedTime,Distance,Year,Quarter,Month,DayofMonth,DayOfWeek,Operated_or_Branded_Code_Share_Partners,DOT_ID_Marketing_Airline,IATA_Code_Marketing_Airline,Flight_Number_Marketing_Airline,Operating_Airline,DOT_ID_Operating_Airline,IATA_Code_Operating_Airline,Tail_Number,Flight_Number_Operating_Airline,OriginAirportID,OriginAirportSeqID,OriginCityMarketID,OriginCityName,OriginState,OriginStateFips,OriginStateName,OriginWac,DestAirportID,DestAirportSeqID,DestCityMarketID,DestCityName,DestState,DestStateFips,DestStateName,DestWac,DepDel15,DepartureDelayGroups,DepTimeBlkStart,DepTimeBlkEnd,TaxiOut,WheelsOff,WheelsOn,TaxiIn,CRSArrTime,ArrDelay,ArrDel15,ArrivalDelayGroups,ArrTimeBlkStart,ArrTimeBlkEnd,DistanceGroup,DivAirportLandings,Marketing_Airline_Network
date,cat,cat,cat,bool,bool,u16,f32,f32,f32,f32,f32,f32,f32,f32,f32,u16,u8,u8,u8,u8,cat,u16,cat,u16,cat,u16,cat,cat,u16,u16,u32,u16,cat,cat,u8,cat,u8,u16,u32,u16,cat,cat,u8,cat,u8,f32,f32,time,time,f32,f32,f32,f32,u16,f32,f32,f32,time,time,u8,f32,str
2018-10-06,"""American Airli…","""PHL""","""CLT""",False,False,1340,1350.0,10.0,10.0,1526.0,0.0,74.0,106.0,96.0,449.0,2018,4,10,6,6,"""AA""",19805,"""AA""",793,"""AA""",19805,"""AA""","""N816AW""",793,14100,1410005,34100,"""Philadelphia, …","""PA""",42,"""Pennsylvania""",23,11057,1105703,31057,"""Charlotte, NC""","""NC""",37,"""North Carolina…",36,0.0,0.0,00:00:00.000001300,00:00:00.000001359,17.0,1407.0,1521.0,5.0,1526,0.0,0.0,0.0,00:00:00.000001500,00:00:00.000001559,2,0.0,"""AA"""
2018-10-07,"""American Airli…","""PHL""","""CLT""",False,False,1340,1803.0,263.0,263.0,1940.0,254.0,68.0,106.0,97.0,449.0,2018,4,10,7,7,"""AA""",19805,"""AA""",793,"""AA""",19805,"""AA""","""N117UW""",793,14100,1410005,34100,"""Philadelphia, …","""PA""",42,"""Pennsylvania""",23,11057,1105703,31057,"""Charlotte, NC""","""NC""",37,"""North Carolina…",36,1.0,12.0,00:00:00.000001300,00:00:00.000001359,25.0,1828.0,1936.0,4.0,1526,254.0,1.0,12.0,00:00:00.000001500,00:00:00.000001559,2,0.0,"""AA"""
2018-10-08,"""American Airli…","""PHL""","""CLT""",False,False,1340,1502.0,82.0,82.0,1641.0,75.0,66.0,106.0,99.0,449.0,2018,4,10,8,1,"""AA""",19805,"""AA""",793,"""AA""",19805,"""AA""","""N658AW""",793,14100,1410005,34100,"""Philadelphia, …","""PA""",42,"""Pennsylvania""",23,11057,1105703,31057,"""Charlotte, NC""","""NC""",37,"""North Carolina…",36,1.0,5.0,00:00:00.000001300,00:00:00.000001359,16.0,1518.0,1624.0,17.0,1526,75.0,1.0,5.0,00:00:00.000001500,00:00:00.000001559,2,0.0,"""AA"""


In [47]:
# sometimes `pl.scan_parquet` is not working due to types issues, so we can use `pl.scan_pyarrow_dataset
import pyarrow.dataset as ds
dset = ds.dataset("/Users/igormintz/Documents/polars_parquet/hive_1",
                  format="parquet",
                  partitioning="hive")
(
    pl.scan_pyarrow_dataset(dset)
    .filter(pl.col("Marketing_Airline_Network") == "AA")
    .collect()
    .head(3)
)

FlightDate,Airline,Origin,Dest,Cancelled,Diverted,CRSDepTime,DepTime,DepDelayMinutes,DepDelay,ArrTime,ArrDelayMinutes,AirTime,CRSElapsedTime,ActualElapsedTime,Distance,Year,Quarter,Month,DayofMonth,DayOfWeek,Operated_or_Branded_Code_Share_Partners,DOT_ID_Marketing_Airline,IATA_Code_Marketing_Airline,Flight_Number_Marketing_Airline,Operating_Airline,DOT_ID_Operating_Airline,IATA_Code_Operating_Airline,Tail_Number,Flight_Number_Operating_Airline,OriginAirportID,OriginAirportSeqID,OriginCityMarketID,OriginCityName,OriginState,OriginStateFips,OriginStateName,OriginWac,DestAirportID,DestAirportSeqID,DestCityMarketID,DestCityName,DestState,DestStateFips,DestStateName,DestWac,DepDel15,DepartureDelayGroups,DepTimeBlkStart,DepTimeBlkEnd,TaxiOut,WheelsOff,WheelsOn,TaxiIn,CRSArrTime,ArrDelay,ArrDel15,ArrivalDelayGroups,ArrTimeBlkStart,ArrTimeBlkEnd,DistanceGroup,DivAirportLandings,Marketing_Airline_Network
date,cat,cat,cat,bool,bool,u16,f32,f32,f32,f32,f32,f32,f32,f32,f32,u16,u8,u8,u8,u8,cat,u16,cat,u16,cat,u16,cat,cat,u16,u16,u32,u16,cat,cat,u8,cat,u8,u16,u32,u16,cat,cat,u8,cat,u8,f32,f32,time,time,f32,f32,f32,f32,u16,f32,f32,f32,time,time,u8,f32,str
2018-10-06,"""American Airli…","""PHL""","""CLT""",False,False,1340,1350.0,10.0,10.0,1526.0,0.0,74.0,106.0,96.0,449.0,2018,4,10,6,6,"""AA""",19805,"""AA""",793,"""AA""",19805,"""AA""","""N816AW""",793,14100,1410005,34100,"""Philadelphia, …","""PA""",42,"""Pennsylvania""",23,11057,1105703,31057,"""Charlotte, NC""","""NC""",37,"""North Carolina…",36,0.0,0.0,00:00:00.000001300,00:00:00.000001359,17.0,1407.0,1521.0,5.0,1526,0.0,0.0,0.0,00:00:00.000001500,00:00:00.000001559,2,0.0,"""AA"""
2018-10-07,"""American Airli…","""PHL""","""CLT""",False,False,1340,1803.0,263.0,263.0,1940.0,254.0,68.0,106.0,97.0,449.0,2018,4,10,7,7,"""AA""",19805,"""AA""",793,"""AA""",19805,"""AA""","""N117UW""",793,14100,1410005,34100,"""Philadelphia, …","""PA""",42,"""Pennsylvania""",23,11057,1105703,31057,"""Charlotte, NC""","""NC""",37,"""North Carolina…",36,1.0,12.0,00:00:00.000001300,00:00:00.000001359,25.0,1828.0,1936.0,4.0,1526,254.0,1.0,12.0,00:00:00.000001500,00:00:00.000001559,2,0.0,"""AA"""
2018-10-08,"""American Airli…","""PHL""","""CLT""",False,False,1340,1502.0,82.0,82.0,1641.0,75.0,66.0,106.0,99.0,449.0,2018,4,10,8,1,"""AA""",19805,"""AA""",793,"""AA""",19805,"""AA""","""N658AW""",793,14100,1410005,34100,"""Philadelphia, …","""PA""",42,"""Pennsylvania""",23,11057,1105703,31057,"""Charlotte, NC""","""NC""",37,"""North Carolina…",36,1.0,5.0,00:00:00.000001300,00:00:00.000001359,16.0,1518.0,1624.0,17.0,1526,75.0,1.0,5.0,00:00:00.000001500,00:00:00.000001559,2,0.0,"""AA"""


# Cloud
``` bash
pip install fsspec  
pip install s3fs    # S3Fs is a Pythonic file interface to S3. It builds on top of botocore.
pip install adlfs   # Filesystem interface to Azure-Datalake Gen1 and Gen2 Storage
pip install gcsfs   # GCSFS is aPythonic file-system for Google Cloud Storage
```

## Upload to S3

In [None]:
import s3fs
bucket_name = "polars-parquets"
parquet_key = "test_write.parquet"

fs = s3fs.S3FileSystem(secret='...', token='...',)
destination = "s3://bucket/my_file.parquet"

# write parquet
with fs.open(f"{bucket_name}/{parquet_key}", mode="wb") as f:
    df.write_parquet(f)

## Download from S3

In [53]:
source = "s3://polars-parquets/hive_1/*/*.parquet"

with open("/Users/igormintz/Documents/secret.txt") as f:
    access_key_id = f.readline().strip()
    access_key = f.readline().strip()

storage_options = {
    "aws_access_key_id": access_key_id,
    "aws_secret_access_key": access_key,
    "aws_region": "eu-north-1",
}
df = (
    pl.scan_parquet(source, storage_options=storage_options)
    .filter(pl.col("Marketing_Airline_Network") == "AA")
    .collect()
)
df.head(3)

FlightDate,Airline,Origin,Dest,Cancelled,Diverted,CRSDepTime,DepTime,DepDelayMinutes,DepDelay,ArrTime,ArrDelayMinutes,AirTime,CRSElapsedTime,ActualElapsedTime,Distance,Year,Quarter,Month,DayofMonth,DayOfWeek,Operated_or_Branded_Code_Share_Partners,DOT_ID_Marketing_Airline,IATA_Code_Marketing_Airline,Flight_Number_Marketing_Airline,Operating_Airline,DOT_ID_Operating_Airline,IATA_Code_Operating_Airline,Tail_Number,Flight_Number_Operating_Airline,OriginAirportID,OriginAirportSeqID,OriginCityMarketID,OriginCityName,OriginState,OriginStateFips,OriginStateName,OriginWac,DestAirportID,DestAirportSeqID,DestCityMarketID,DestCityName,DestState,DestStateFips,DestStateName,DestWac,DepDel15,DepartureDelayGroups,DepTimeBlkStart,DepTimeBlkEnd,TaxiOut,WheelsOff,WheelsOn,TaxiIn,CRSArrTime,ArrDelay,ArrDel15,ArrivalDelayGroups,ArrTimeBlkStart,ArrTimeBlkEnd,DistanceGroup,DivAirportLandings,Marketing_Airline_Network
date,cat,cat,cat,bool,bool,u16,f32,f32,f32,f32,f32,f32,f32,f32,f32,u16,u8,u8,u8,u8,cat,u16,cat,u16,cat,u16,cat,cat,u16,u16,u32,u16,cat,cat,u8,cat,u8,u16,u32,u16,cat,cat,u8,cat,u8,f32,f32,time,time,f32,f32,f32,f32,u16,f32,f32,f32,time,time,u8,f32,str
2018-10-06,"""American Airli…","""PHL""","""CLT""",False,False,1340,1350.0,10.0,10.0,1526.0,0.0,74.0,106.0,96.0,449.0,2018,4,10,6,6,"""AA""",19805,"""AA""",793,"""AA""",19805,"""AA""","""N816AW""",793,14100,1410005,34100,"""Philadelphia, …","""PA""",42,"""Pennsylvania""",23,11057,1105703,31057,"""Charlotte, NC""","""NC""",37,"""North Carolina…",36,0.0,0.0,00:00:00.000001300,00:00:00.000001359,17.0,1407.0,1521.0,5.0,1526,0.0,0.0,0.0,00:00:00.000001500,00:00:00.000001559,2,0.0,"""AA"""
2018-10-07,"""American Airli…","""PHL""","""CLT""",False,False,1340,1803.0,263.0,263.0,1940.0,254.0,68.0,106.0,97.0,449.0,2018,4,10,7,7,"""AA""",19805,"""AA""",793,"""AA""",19805,"""AA""","""N117UW""",793,14100,1410005,34100,"""Philadelphia, …","""PA""",42,"""Pennsylvania""",23,11057,1105703,31057,"""Charlotte, NC""","""NC""",37,"""North Carolina…",36,1.0,12.0,00:00:00.000001300,00:00:00.000001359,25.0,1828.0,1936.0,4.0,1526,254.0,1.0,12.0,00:00:00.000001500,00:00:00.000001559,2,0.0,"""AA"""
2018-10-08,"""American Airli…","""PHL""","""CLT""",False,False,1340,1502.0,82.0,82.0,1641.0,75.0,66.0,106.0,99.0,449.0,2018,4,10,8,1,"""AA""",19805,"""AA""",793,"""AA""",19805,"""AA""","""N658AW""",793,14100,1410005,34100,"""Philadelphia, …","""PA""",42,"""Pennsylvania""",23,11057,1105703,31057,"""Charlotte, NC""","""NC""",37,"""North Carolina…",36,1.0,5.0,00:00:00.000001300,00:00:00.000001359,16.0,1518.0,1624.0,17.0,1526,75.0,1.0,5.0,00:00:00.000001500,00:00:00.000001559,2,0.0,"""AA"""
