## Import Python Libraries

In [1]:
%pip install -r requirements.txt


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.1.2[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
import typing as T
from datetime import datetime, timezone

from boto_session_manager import BotoSesManager
from s3pathlib import S3Path, context
import polars as pl # dataframe manipulation
from tabulate import tabulate # pretty print dataframe

## Define S3 Storage Backend

In [3]:
aws_profile = "bmt_app_dev_us_east_1"
bsm = BotoSesManager(profile_name=aws_profile)
context.attach_boto_session(boto_ses=bsm.boto_ses)
credential = bsm.boto_ses.get_credentials()
storage_options = {
    "AWS_REGION": bsm.aws_region,
    "AWS_ACCESS_KEY_ID": credential.access_key,
    "AWS_SECRET_ACCESS_KEY": credential.secret_key,
    "AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}
if credential.token:
    storage_options["AWS_SESSION_TOKEN"] = credential.token

bucket = f"{bsm.aws_account_alias}-{bsm.aws_region}-data"
s3dir_db = S3Path(f"s3://{bucket}/projects/learn_delta_py/mydb/").to_dir()
s3dir_t_account = (s3dir_db / "accounts").to_dir()

def reset_db():
    """
    Reset the database by deleting the entire database S3 folder.
    """
    s3dir_db.delete()

In [4]:
reset_db()

## Define Data Schema 

In [5]:
account_schema = {
    "account_id": pl.Utf8(),
    "create_at": pl.Datetime(),
    "update_at": pl.Datetime(),
    "account_number": pl.Utf8(),
    "account_type": pl.Utf8(),
    "description": pl.Utf8(),
    # partition keys
    "year": pl.Utf8(),
    "month": pl.Utf8(),
    "day": pl.Utf8(),
}

def add_partition_keys_for_accounts(rows: T.List[T.Dict]):
    """
    Add partition keys based on create_at time.
    """
    for row in rows:
        row["year"] = str(row["create_at"].year)
        row["month"] = str(row["create_at"].month).zfill(2)
        row["day"] = str(row["create_at"].day).zfill(2)
    return rows

## Insert Two Rows 

In [6]:
def w1_create_accounts():
    data = [
        {
            "account_id": "acc-1",
            "create_at": datetime(2021, 1, 1, tzinfo=timezone.utc),
            "update_at": datetime(2021, 1, 1, tzinfo=timezone.utc),
            "account_number": "1111-1111-1111",
            "account_type": "checking",
            "description": "Alice's Main checking account",
        },
        {
            "account_id": "acc-2",
            "create_at": datetime(2021, 1, 2, tzinfo=timezone.utc),
            "update_at": datetime(2021, 1, 2, tzinfo=timezone.utc),
            "account_number": "2222-2222-2222",
            "account_type": "checking",
            "description": "Bob's Main checking account",
        },
    ]
    df = pl.DataFrame(
        add_partition_keys_for_accounts(data), 
        schema=account_schema,
    )
    df.write_delta(
        s3dir_t_account.uri,
        mode="append",
        delta_write_options=dict(
            partition_by=["year", "month", "day"],
        ),
        storage_options=storage_options,
    )

w1_create_accounts()

Exam Results

In [7]:
def pprint_df(df: pl.DataFrame):
    print(tabulate(df.to_dict(), headers=list(df.schema), tablefmt="grid"))
    

def query_accounts():
    df = pl.scan_delta(
        s3dir_t_account.uri,
        storage_options=storage_options,
    ).sort(by="create_at").collect()
    pprint_df(df)
    
query_accounts()

+--------------+---------------------+---------------------+------------------+----------------+-------------------------------+--------+---------+-------+
| account_id   | create_at           | update_at           | account_number   | account_type   | description                   |   year |   month |   day |
| acc-1        | 2021-01-01 00:00:00 | 2021-01-01 00:00:00 | 1111-1111-1111   | checking       | Alice's Main checking account |   2021 |      01 |    01 |
+--------------+---------------------+---------------------+------------------+----------------+-------------------------------+--------+---------+-------+
| acc-2        | 2021-01-02 00:00:00 | 2021-01-02 00:00:00 | 2222-2222-2222   | checking       | Bob's Main checking account   |   2021 |      01 |    02 |
+--------------+---------------------+---------------------+------------------+----------------+-------------------------------+--------+---------+-------+


## Do Upsert, Update 1 row, Insert 1 row

In [8]:
def w2_update_accounts():
    data = [
        {
            "account_id": "acc-2",
            "account_number": "2222-2222-2222",
            "create_at": datetime(2021, 1, 2, tzinfo=timezone.utc),
            "update_at": datetime(2021, 1, 3, tzinfo=timezone.utc),
            "account_type": "checking",
            "description": "Bob's Main checking account, updated",
        },
        {
            "account_id": "acc-3",
            "account_number": "3333-3333-3333",
            "create_at": datetime(2021, 1, 3, tzinfo=timezone.utc),
            "update_at": datetime(2021, 1, 3, tzinfo=timezone.utc),
            "account_type": "saving",
            "description": "Cathy's Main saving account",
        },
    ]
    df = pl.DataFrame(
        add_partition_keys_for_accounts(data), 
        schema=account_schema,
    )
    table_merger = df.write_delta(
        s3dir_t_account.uri,
        mode="merge",
        delta_write_options=dict(
            partition_by=["year", "month", "day"],
        ),
        delta_merge_options=dict(
            predicate="s.account_id = t.account_id",
            source_alias="s",
            target_alias="t",
        ),
        storage_options=storage_options,
    )
    (
        table_merger
        .when_matched_update_all() # will do update
        .when_not_matched_insert_all() # will do insert
        .execute()
    )

w2_update_accounts()

Exam Result

In [9]:
query_accounts()

+--------------+---------------------+---------------------+------------------+----------------+--------------------------------------+--------+---------+-------+
| account_id   | create_at           | update_at           | account_number   | account_type   | description                          |   year |   month |   day |
| acc-1        | 2021-01-01 00:00:00 | 2021-01-01 00:00:00 | 1111-1111-1111   | checking       | Alice's Main checking account        |   2021 |      01 |    01 |
+--------------+---------------------+---------------------+------------------+----------------+--------------------------------------+--------+---------+-------+
| acc-2        | 2021-01-02 00:00:00 | 2021-01-03 00:00:00 | 2222-2222-2222   | checking       | Bob's Main checking account, updated |   2021 |      01 |    02 |
+--------------+---------------------+---------------------+------------------+----------------+--------------------------------------+--------+---------+-------+
| acc-3        | 2021-