In [2]:
import polars as pl
import polars.selectors as cs

In [3]:
df = pl.read_ndjson('../data/bamboo.ndjson')

FileNotFoundError: No such file or directory (os error 2): ../data/bamboo.ndjson

In [None]:
from zlib import crc32

def assign_split(column: str) -> pl.Expr:

    seed = (
        pl.col(column)
        .cast(pl.String)
        .map_elements(lambda x: float(crc32(str.encode(x)) & 0xFFFFFFFF), return_dtype=pl.Float32)
        .mul(1 / 2**32)
    )

    return (
        pl.when(seed.is_between(0.0, 0.6))
        .then(pl.lit("train"))
        .when(seed.is_between(0.6, 0.8))
        .then(pl.lit("validate"))
        .when(seed.is_between(0.8, 1.0))
        .then(pl.lit("test"))
        .otherwise(pl.lit("train"))
    )

In [None]:
dynamic = (
    df.select(
        "bureau_search_ref",
        pl.col("created_at").str.replace("\+00", "").str.to_date(format="%Y-%m-%d %H:%M:%S%.f"),
        cs.by_dtype(pl.List(pl.String)) | cs.by_dtype(pl.List(pl.Int64)),
        target=pl.col("irb"),
    )
    .explode(pl.exclude("created_at","bureau_search_ref", "target"))
    .select(
        cs.all() - cs.string() - cs.by_name("account_index"),
        (cs.string() - cs.by_name("m")).replace("", None),
        statement_id=pl.concat_str("accgroupid", pl.lit("$"), "m"),
        account_open=pl.col("accstartdate").str.to_date(),
        account_index=pl.col("accgroupid").cast(pl.String),
        statement_date=pl.col("m").str.to_date(format="%Y-%m"),
    )
    # filter out statements of accounts with empty balances
    .filter(pl.col("bal") > 0)
    .select(
        "payamt", "bal", "history_limit", "bureau_search_ref", "pay", "acctypecode", "target", "statement_id", "account_index",
        # calculate two duration fields (time diff between statement and application date, statement and account open date)
        statement_tenure=(pl.col("statement_date") - pl.col("created_at")).dt.total_days(),
        account_tenure=(pl.col("account_open") - pl.col("created_at")).dt.total_days(),
    )
    .sort("bureau_search_ref", "statement_tenure")
    .group_by("bureau_search_ref")
    .agg(cs.all())
)

In [None]:
static = (
    df.select(
        (cs.string() - cs.by_name('loan_account_ref')).replace("", None),
        cs.integer(),
        split=assign_split('bureau_search_ref'),
    )
)

In [None]:
ledger = dynamic.join(static, on='bureau_search_ref')

In [None]:
out = {}

for field, dtype in dict(static.schema).items():
    out[field] = "category" if dtype == pl.String else "number"

In [None]:
static['nob'].unique().to_list()

In [None]:
for index, shard in enumerate(ledger.iter_slices(10)):
    shard.write_avro(f"../data/bamboo/bamboo-{index}.avro", name="bamboo")

In [None]:
dynamic.select(
    acctypecode=pl.col('bal').list.len()
).get_column('acctypecode').plot.hist()