In [None]:
import os
from dotenv import find_dotenv, load_dotenv

try:
    env_file = find_dotenv(raise_error_if_not_found=True)
    load_dotenv(env_file)
except IOError as e:
    print(e)

MINIO_ENDPOINT = os.environ['MINIO_ENDPOINT']
MINIO_ACCESS_KEY = os.environ['MINIO_ACCESS_KEY']
MINIO_SECRET_KEY = os.environ['MINIO_SECRET_KEY']

In [None]:
import polars as pl
data_url = 'https://storage.googleapis.com/covid19-open-data/v3/epidemiology.csv'

schema = {
    'date': pl.Date,
    'location_key': pl.String,
    'new_confirmed': pl.Int64,
    'new_deceased': pl.Int64,
    'new_recovered': pl.Int64,
    'new_tested': pl.Int64,
    'cumulative_confirmed': pl.Int64,
    'cumulative_deceased': pl.Int64,
    'cumulative_recovered': pl.Int64,
    'cumulative_tested': pl.Int64,
}
df = pl.read_csv(data_url, schema=schema)

df.sample(10)

In [None]:
from deltalake import DeltaTable

# Minio Connection Parameters
storage_options = {
    'endpoint_url': MINIO_ENDPOINT,
    'AWS_ACCESS_KEY_ID': MINIO_ACCESS_KEY,
    'AWS_SECRET_ACCESS_KEY': MINIO_SECRET_KEY,
    'conditional_put': 'etag' #https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/#enabling-concurrent-writes-for-alternative-clients
}
bucket_name = 'deltalake-demo'
dtable_schema_name = 'covid'

dtable_schema = df.to_arrow().schema  # convert dataframe schema to pyArrow

dtable = DeltaTable.create(table_uri=f's3a://{bucket_name}/{dtable_schema_name}', schema=dtable_schema,
                           storage_options=storage_options)

In [None]:
df.write_delta(dtable, mode='append')

In [None]:
# Create a LazyFrame representing the Delta Table
ldf = pl.scan_delta(dtable, use_pyarrow=True)
ldf.collect_schema()

In [None]:
ldf.head(10).collect()

In [None]:
ldf.filter(
    (pl.col('date') >= pl.date(2022, 7, 1)) &
    (pl.col('date') < pl.date(2022, 8, 1))
).collect().head(10)

In [None]:
ldf.filter(
    (pl.col('date') >= pl.date(2022, 7, 1)) &
    (pl.col('date') < pl.date(2022, 8, 1))
).group_by('location_key').agg(pl.col('new_confirmed').mean()).collect().head(10)

In [None]:
# Instantiate the PyArrow Dataset
ds = dtable.to_pyarrow_dataset()
ds.schema

In [None]:
import pyarrow as pa
import pyarrow.compute as pc
from datetime import date

july_table = ds.filter(
    ((pc.field('date') >= pa.scalar(date(2022, 7, 1), type=pa.date32())) &
     (pc.field('date') < pa.scalar(date(2022, 8, 1), type=pa.date32()))
     )
).to_table()

result = pa.TableGroupBy(july_table, 'location_key').aggregate([('new_confirmed', 'mean')])
result

In [None]:
pl.DataFrame(result).head(10)

In [None]:
dtable.optimize.compact()

In [None]:
dtable.vacuum() # Does nothing in this instance because we haven't deleted any rows.