# Chapter 2: Reading and Writing Files

## Reading and writing CSV files

### Getting ready

In [None]:
import polars as pl

### How to do it...

In [None]:
df = pl.read_csv('../data/customer_shopping_data.csv')
df.head()

In [None]:
df = pl.read_csv('../data/customer_shopping_data_no_header.csv')
df.head()

In [None]:
df = pl.read_csv('../data/customer_shopping_data_no_header.csv', has_header=False)
df.head()

In [None]:
column_names = ['invoice_no', 'customer_id', 'gender', 'age', 'category', 'quantity', 'price', 'payment_method', 'invoice_date', 'shopping_mall']
df = pl.read_csv('../data/customer_shopping_data_no_header.csv', 
                 has_header=False, 
                 new_columns=column_names)
df.head()

In [None]:
df = pl.read_csv('../data/customer_shopping_data_no_header.csv', 
                 has_header=False, 
                 new_columns=column_names, 
                 try_parse_dates=True)
df.head()

In [None]:
df = pl.read_csv('../data/customer_shopping_data_no_header.csv', 
                 has_header=False, 
                 new_columns=column_names, 
                 try_parse_dates=True, 
                 dtypes={'age': pl.Int8, 'quantity': pl.Int32})
df.head()

In [None]:
df.write_csv('../data/output/shopping_data_output.csv', 
             has_header=False, 
             separator=',')

### There is more...

In [None]:
lf = pl.scan_csv('../data/customer_shopping_data_no_header.csv', 
                 has_header=False, 
                 new_columns=column_names, 
                 try_parse_dates=True, 
                 dtypes={'age': pl.Int8, 'quantity': pl.Int32})
lf.fetch(5)

In [None]:
lf.sink_csv('../data/output/shopping_data_output_sink.csv')

## Reading and writing parquet files

### How to do it...

In [None]:
parquet_input_file_path = '../data/venture_funding_deals.parquet'
df = pl.read_parquet(parquet_input_file_path, 
                     columns=['Company', 'Amount', 'Valuation', 'Industry'], 
                     row_count_name='row_cnt')
df.head()

In [None]:
pl.read_parquet_schema(parquet_input_file_path)

In [None]:
parquet_output_file_path = '../data/output/venture_funding_deals_output.parquet'
df.write_parquet(parquet_output_file_path, compression='zstd', compression_level=10)

In [None]:
lf = pl.scan_parquet(parquet_input_file_path)
lf.collect().head()

In [None]:
lf.sink_parquet(parquet_output_file_path, maintain_order=False)

### There is more...

In [None]:
partitioned_parquet_input_file_path = '../data/venture_funding_deals_partitioned'
df = pl.read_parquet(
    partitioned_parquet_input_file_path, 
    use_pyarrow=True, 
    pyarrow_options={'partitioning': 'hive'}
)
df.head()

In [None]:
partitioned_parquet_output_file_path = '../data/output/venture_funding_deals_partitioned_output'
df.write_parquet(
    partitioned_parquet_output_file_path, 
    use_pyarrow=True, 
    pyarrow_options={
        'partition_cols': ['Industry'],
        'existing_data_behavior': 'overwrite_or_ignore'
        }
)

## Reading and writing Delta Lake tables

### How to do it...

In [None]:
delta_input_file_path = '../data/venture_funding_deals_delta'
df = pl.read_delta(delta_input_file_path)
df.head()

In [None]:
lf = pl.scan_delta(delta_input_file_path)
lf.collect().head()

In [None]:
df.write_delta('../data/output/venture_funding_deals_delta_output', mode='overwrite')

In [None]:
delta_partitioned_output_file_path = '../data/output/venture_funding_deals_delta_partitioned_output'
delta_write_options = {'partition_by': 'Industry'}
df.write_delta(
    delta_partitioned_output_file_path, 
    mode='overwrite', 
    delta_write_options=delta_write_options
)

In [None]:
df = pl.read_delta(delta_partitioned_output_file_path)
df.head()

In [None]:
df = pl.read_delta(
    delta_partitioned_output_file_path, 
    pyarrow_options={'partitions': [('Industry', '=', 'Accounting')]}
)
df.head()

### There is more...

In [None]:
from config import aws_access_key_id, aws_secret_access_key

In [None]:
table_path = 's3://sandbox-data-lake/letters_delta'
storage_options= {
    'aws_access_key_id': aws_access_key_id,
    'aws_secret_access_key': aws_secret_access_key,
    'aws_region': 'us-west-1'
}

table_path = 's3://YOUR_S3BUCKET_URI/YOUR_DELTA_TABLE'
storage_options= {
    'aws_access_key_id': 'YOUR_ACCESS_KEY',
    'aws_secret_access_key': 'YOUR_SECRET_ACCESS_KEY',
    'aws_region': 'YOUR_REGION'
}

df = pl.read_delta(table_path, storage_options=storage_options)  
df.head()

## Reading and writing JSON files

### Getting ready

### How to do it...

In [None]:
df = pl.read_json('../data/world_population.json')
df.select(df.columns[:10]).head()

In [None]:
df.write_json('../data/output/world_population_output.json')

In [None]:
df = pl.read_ndjson('../data/world_population.jsonl')
df.select(df.columns[:10]).head()

In [None]:
df.write_ndjson('../data/output/world_population_output.jsonl')

### There is more...

In [None]:
lf = pl.scan_ndjson('../data/world_population.jsonl')
lf.select(lf.columns[:10]).collect().head()

## Reading and writing Excel files

### Getting ready

In [None]:
import polars as pl

### How to do it...

In [None]:
input_file_path = '../data/financial_sample.xlsx'
df = pl.read_excel(
    input_file_path, 
    sheet_name='Sheet1',
    read_csv_options={'has_header': True, 'try_parse_dates': True}
)
df.head()

In [None]:
output_file_path = '../data/output/financial_sample_output.xlsx'
df.write_excel(
    output_file_path,
    worksheet='Output Sheet1',
    header_format={'bold': True}
)

## Reading and writing other file formats

### How to do it...

In [9]:
import polars as pl

In [8]:
csv_input_file_path = '../data/customer_shopping_data.csv'
ipc_file_path = '../data/customer_shopping_data.arrow'
df = pl.read_csv(csv_input_file_path)
df.write_ipc(ipc_file_path)

In [None]:
df = pl.read_ipc(ipc_file_path)
df.head()

In [None]:
avro_file_path = '../data/customer_shopping_data.avro'
df = pl.read_csv(csv_input_file_path)
df.write_avro(avro_file_path)

In [None]:
df = pl.read_avro(avro_file_path)
df.head()

In [None]:
import duckdb

duckdb.sql('INSTALL iceberg;')
duckdb.sql('LOAD iceberg;')
# duckdb.sql('SELECT count(*) FROM iceberg_scan("../data/lineitem_iceberg", allow_moved_paths = true);')
duckdb.sql('SELECT * FROM iceberg_scan("../data/lineitem_iceberg", allow_moved_paths = true) limit 10;')


In [None]:
# iceberg_input_file_path = '../data/iceberg_table/metadata/00001-41687cbb-3a0c-4ef3-b3fa-e7026ed2eb77.metadata.json'
# iceberg_input_file_path = '../data/iceberg_table/data/bvwNRQ/category=food/id_bucket=4/20231021_214808_00048_4bpqv-471f34de-a5e9-47a9-b2c6-c22c2a385ad8.parquet'
iceberg_input_file_path = '../data/lineitem_iceberg/metadata/v1.metadata.json'
lf = pl.scan_iceberg(iceberg_input_file_path)


In [None]:
iceberg_input_file_path = 's3://sandbox-data-lake/iceberg-folder/metadata/00001-41687cbb-3a0c-4ef3-b3fa-e7026ed2eb77.metadata.json'
storage_options= {
    'aws_access_key_id': aws_access_key_id,
    'aws_secret_access_key': aws_secret_access_key,
    'aws_region': 'us-west-1'
}

lf = pl.scan_iceberg(iceberg_input_file_path, storage_options=storage_options)  
lf.collect().head()

### There is more...

In [10]:
lf = pl.scan_ipc(ipc_file_path)

invoice_no,customer_id,gender,age,category,quantity,price,payment_method,invoice_date,shopping_mall
str,str,str,i64,str,i64,f64,str,str,str
"""I138884""","""C241288""","""Female""",28,"""Clothing""",5,1500.4,"""Credit Card""","""5/8/2022""","""Kanyon"""
"""I317333""","""C111565""","""Male""",21,"""Shoes""",3,1800.51,"""Debit Card""","""12/12/2021""","""Forum Istanbul…"
"""I127801""","""C266599""","""Male""",20,"""Clothing""",1,300.08,"""Cash""","""9/11/2021""","""Metrocity"""
"""I173702""","""C988172""","""Female""",66,"""Shoes""",5,3000.85,"""Credit Card""","""16/05/2021""","""Metropol AVM"""
"""I337046""","""C189076""","""Female""",53,"""Books""",4,60.6,"""Cash""","""24/10/2021""","""Kanyon"""


In [11]:
lf.sink_ipc('../data/output/customer_shopping_data.arrow')

InvalidOperationError: sink_Ipc(IpcWriterOptions { compression: Some(ZSTD), maintain_order: true }) not yet supported in standard engine. Use 'collect().write_parquet()'

In [13]:
lf.collect().write_ipc('../data/output/customer_shopping_data.arrow')

In [14]:
lf = pl.scan_csv(csv_input_file_path)
lf.sink_ipc('../data/output/customer_shopping_data_from_csv.arrow')

## Reading and writing multiple files

### How to do it...

In [None]:
data = {'Letter': ['A','B','C'], 'Value': [1,2,3]}
df = pl.DataFrame(data)

In [None]:
dfs = df.group_by('Letter')
print(dfs)

In [None]:
for name, df in dfs:
    df.write_csv(f'../data/output/letter_{name}.csv')

In [None]:
df = pl.read_csv('../data/output/letter_*.csv')
df.head()

In [None]:
lf = pl.scan_csv('../data/output/letter_*.csv')
lf.collect().head()

### There is more...

In [None]:
import glob
lfs = [pl.scan_csv(file) for file in glob.glob('../data/output/letter_*.csv')]
dfs = pl.collect_all(lfs)
dfs

## Working with databases

### How to do it...

In [4]:
import polars as pl

In [61]:
from config import postgres_pass, postgres_user

In [62]:
# connectorx is required
uri = f'postgres://{postgres_user}:{postgres_pass}@localhost:5432/postgres' 
query = 'SELECT * FROM sandbox.cars'
df = pl.read_database_uri(query, uri)
df.head()

brand,model,year
str,str,i32
"""Volvo""","""p1800""",1968
"""BMW""","""M1""",1978
"""Toyota""","""Celica""",1975


In [63]:
# pip install adbc-driver-postgresql pyarrow
df = pl.read_database_uri(query, uri, engine='adbc')
df.head()

brand,model,year
str,str,i32
"""Volvo""","""p1800""",1968
"""BMW""","""M1""",1978
"""Toyota""","""Celica""",1975


In [64]:
df = pl.read_database(query, connection=uri)
df.head()

  df = pl.read_database(query, connection=uri)


brand,model,year
str,str,i32
"""Volvo""","""p1800""",1968
"""BMW""","""M1""",1978
"""Toyota""","""Celica""",1975


In [65]:
# pip install sqlalchemy pg8000 or psycopg2 (default is psycopg2)
from sqlalchemy import create_engine

con_string = f'postgresql+pg8000://{postgres_user}:{postgres_pass}@localhost:5432/postgres' 
engine = create_engine(con_string)
conn = engine.connect()

df = pl.read_database(query, connection=conn)
df.head()

brand,model,year
str,str,i64
"""Volvo""","""p1800""",1968
"""BMW""","""M1""",1978
"""Toyota""","""Celica""",1975


In [77]:
df.write_database(table_name="sandbox.cars_output", connection=uri, engine="adbc", if_exists='replace')

ProgrammingError: INVALID_ARGUMENT: [libpq] Failed to create table: ERROR:  relation "sandbox.cars_output" already exists

Query was: CREATE TABLE "public" . "sandbox.cars_output" ("brand" TEXT, "model" TEXT, "year" BIGINT). SQLSTATE: 42P07

In [76]:
df.write_database(table_name="sandbox.cars_output", connection=con_string, engine="sqlalchemy", if_exists='replace')

In [72]:
df = pl.read_database_uri('select * from sandbox.cars_output', uri, engine='adbc')
df.head()

brand,model,year
str,str,i64
"""Volvo""","""p1800""",1968
"""BMW""","""M1""",1978
"""Toyota""","""Celica""",1975
