# Polars Deep Dive for Seder

这个 Notebook 是我在开发这个库之前, 事先做的一些深入研究. 根据这个研究的结果, 我们可以看到 polars 有哪些功能, 有哪些局限性, 从而决定这个库的最终设计思路.

In [59]:
import typing as T
import polars as pl
from rich import print as rprint

In [60]:
def print_df(df: pl.DataFrame):
    print("-------------------- schema --------------------")
    rprint(df.schema)

    print("-------------------- df records --------------------")
    for ith, record in enumerate(df.to_dicts(), start=1):
        rprint(record)

## POC

### Data and Schema Mismatch

研究当构造 DataFrame 时的数据跟 schema 不匹配时 polars 的行为.

In [61]:
df = pl.DataFrame(
    [
        {"Data": {"id": 1}},
        {"Data": {"id": 2}},
    ],
    schema={
        "Data": pl.Struct({
            "id": pl.Int64(),
        }),
    },
)
print_df(df)

-------------------- schema --------------------


-------------------- df records --------------------


在显式给定 Schema 的情况下, 如果数据类型不对则会自动 Cast.

In [62]:
df = pl.DataFrame(
    [
        {"Data": {"id": 1}},
        {"Data": {"id": 2.0}},
    ],
    schema={
        "Data": pl.Struct({
            "id": pl.Int64(),
        }),
    },
)
print_df(df)

-------------------- schema --------------------


-------------------- df records --------------------


In [63]:
df = pl.DataFrame(
    [
        {"Data": {"id": 1}},
        {"Data": {"id": 2.5}},
    ],
    schema={
        "Data": pl.Struct({
            "id": pl.Int64(),
        }),
    },
)
print_df(df)

-------------------- schema --------------------


-------------------- df records --------------------


如果值是 None 则不会自动 Cast, 结果还是 None.

In [64]:
df = pl.DataFrame(
    [
        {"Data": {"id": 1}},
        {"Data": {"id": None}},
    ],
    schema={
        "Data": pl.Struct({
            "id": pl.Int64(),
        }),
    },
)
print_df(df)

-------------------- schema --------------------


-------------------- df records --------------------


如果自动 Cast 不了, 则值设为 None.

In [65]:
df = pl.DataFrame(
    [
        {"Data": {"id": 1}},
        {"Data": {"id": "id-2"}},
    ],
    schema={
        "Data": pl.Struct({
            "id": pl.Int64(),
        }),
    },
)
print_df(df)

-------------------- schema --------------------


-------------------- df records --------------------


对于复杂结构 struct, 你只要在 schema 中定义了值, 那么就会尝试到对应位置找值, 只要找不到都算做 None. 对于一个 field 是 struct 的情况, 如果 struct 本身是 None, 那么这个 field 的值就是 None. 而 struct 本身是一个 dict, 只是里面的值找不到, 那么这个 field 的值就是里面定义的那些 key, 凡是找不到值的都设为 None.

In [66]:
df = pl.DataFrame(
    [
        {"Data": {"id": 1, "a_struct": {"name": "Alice", "age": 20}}},
        {"Data": {"id": 2, "a_struct": {"name": None, "age": None}}},
        {"Data": {"id": 3, "a_struct": {}}},
        {"Data": {"id": 4, "a_struct": None}},
    ],
    schema={
        "Data": pl.Struct({
            "id": pl.Int64(),
            "a_struct": pl.Struct({
                "name": pl.Utf8(),
                "age": pl.Int64(),
            })
        }),
    },
)
print_df(df)

-------------------- schema --------------------


-------------------- df records --------------------


对于 List 也是一样, 它会根据 schema 中的定义尝试 cast. 当 field 的值本身是 None 那么整个 field 就是 None. 如果 field 的值是 list 但是里面的 element 的 type 不对, 那么会尝试 cast 转换. 如果转换不了, 再看参数 strict. 如果是 strict = True, 则直接报错, 反之则复制 None 然后让他过.

In [67]:
df = pl.DataFrame(
    [
        {"Data": {"id": 1, "a_int_list": [1, 2]}},
        {"Data": {"id": 2, "a_int_list": [1, None]}},
        {"Data": {"id": 3, "a_int_list": ["1", 2]}},
        {"Data": {"id": 4, "a_int_list": ["alice", 2]}},
        {"Data": {"id": 5, "a_int_list": [None, None]}},
        {"Data": {"id": 6, "a_int_list": None}},
    ],
    schema={
        "Data": pl.Struct({
            "id": pl.Int64(),
            "a_int_list": pl.List(pl.Int64())
        }),
    },
    strict=False,
)
print_df(df)

-------------------- schema --------------------


-------------------- df records --------------------


**结论**

只要你定义了 Schema 并且 strict 模式是 False, 那么它是一定会将数据加载到 DataFrame 中的. 换言之, 数据类型不对的数据会自动被清洗掉.

### Serialization 的可能性

在这个例子中, 我们的 DynamoDB 数据中 id 是 hash key, name 是 nullable 的 普通 attribute. 

考虑如下两个 Python 中的字典数据:
 
1. ``{"id": 1, "name": "Alice"}``
2. ``{"id": 1, "name": None}``. 

这两种情况下对应的 DynamoDB 是不同的, 分别是:
 
1. ``{"id": {"N": "1"}, "name": {"S": "Alice"}}``
2. ``{"id": {"N": "1"}, "name": {"NULL": True}}``, 或者是 ``{"id": {"N": "1"}`` (压根没有这个 Attribute)

注意, ``{"name": None}`` 这不是一个合法的 DynamoDB Json, 你把这个数据发给 DynamoDB API 会报错的.

而在 polars 中对于 Map 类型是不支持 Dynamic Mapping 的, 因为允许不同的 Key 会导致无法预测数据类型, 向量化计算也就无法进行了. 我们期待 name 既可以对应 ``{"S": "Alice"}`` 又可以是 ``{"NULL": True}``, **而这是不被允许的**. 如果你尝试用 ``pl.when(...).then(...).otherwise(...)`` 来实现就会发现不管 name 是什么值, 它的输出永远同时包含 ``{"name": {"S": ..., "NULL": ...}}`` 两个 key. 这是因为 [polars.when](https://docs.pola.rs/api/python/stable/reference/expressions/api/polars.when.html) 永远会 evaluate then 和 otherwise 两个语句中的值, 然后之后再进行过滤, 所以返回的 struct 里永远有两个 key. 这样就导致你是不可能用 polars 来将普通 Python 字典转换为 DynamoDB 的数据格式的.

In [68]:
df = pl.DataFrame(
    [
        {"Data": {"id": 1, "name": "Alice"}},
        {"Data": {"id": 2, "name": None}},
    ],
    schema={
        "Data": pl.Struct({
            "id": pl.Int64(),
            "name": pl.Utf8(),
        }),
    },
)
records = df.to_dicts()
rprint(records)

In [69]:
# this is not working
records = df.select(
    (
        pl.when(pl.col("Data").struct.field("name").is_null())
        .then(
            pl.struct(pl.lit(True).alias("NULL"))
        )
        .otherwise(
            pl.struct(pl.col("Data").struct.field("name").alias("N"))
        )
        .alias("id")
    ),
).to_dicts()
rprint(records)

In [70]:
# this is not working too
records = df.select(
    (
        pl.when(pl.col("Data").struct.field("name").is_null())
        .then(
            pl.lit(None)
        )
        .otherwise(
            pl.struct(pl.col("Data").struct.field("name").alias("S"))
        )
        .alias("id")
    ),
).to_dicts()
rprint(records)

上面的现象的原因是当你 select 的时候, 如果某个 column 你将其定义为一个 ``pl.Struct({"key": pl.DataType()})``, 那么当这个 key 的值找不到的时候, 它会返回一个 ``{"key": None}`` 而不是一个完整的 None.

**结论**

你想要根据 name 的值的不同, 在有值的时候返回 ``{"S": "alice"}`` 没有值的时候返回 ``{"NULL": True}`` 是无法用 polars 做到的.

**一点小思路**

我们可以通过一些小的技巧来绕开这个问题. 例如你可以为你的每个 field 定义一个 "fill null value". 比如如果 name 是 string, 没有值的话就填 "UNKNOWN NAME", 如果是一个 list field 就填空列表 ``[]``. 这样就能保证你在 convert 的时候都能成功了.

从应用场景来看, 将普通字典批量转化成 DynamoDB json 只有一个应用场景, 就是手动构造大量数据, 然后用 import table 的功能从数据文件中创建一个新表, 这样做比你创建表之后 batch insert 要快得多也要便宜的多.

下面我们给出了一个例子.

In [71]:
df = pl.DataFrame(
    [
        {"Data": {"id": 1, "name": "Alice"}},
        {"Data": {"id": 2, "name": None}},
    ],
    schema={
        "Data": pl.Struct({
            "id": pl.Int64(),
            "name": pl.Utf8(),
        }),
    },
)
records = df.select(
    pl.struct(
        pl.col("Data").struct.field("id").fill_null(-999).cast(pl.Utf8).alias("N")
    ).alias("id"),
    pl.struct(
        pl.col("Data").struct.field("name").fill_null("UNKNOWN NAME").alias("S")
    ).alias("name"),
).to_dicts()
rprint(records)

### Fill Null

上一节提到, 实现 Serialization 的关键技巧就是 fill null value. 下面的例子测试了各种 fill null value 的情况.

In [72]:
df = pl.DataFrame(
    [
        {"Json": {"a_int": 123, "a_str_list": ["a", "b", "c"], "a_struct": {"id": 1}}},
        {"Json": {"a_int": None, "a_str_list": None, "a_struct": None}},
    ],
    schema={
        "Json": pl.Struct({
            "a_int": pl.Int64(),
            "a_str_list": pl.List(pl.Utf8()),
            "a_struct": pl.Struct({
                "id": pl.Int64(),
            }),
        }),
    },
)
print_df(df)

df_res = df.select(
    pl.struct(
        pl.col("Json").struct.field("a_int").fill_null(-999).alias("a_int"),
        pl.col("Json").struct.field("a_str_list").fill_null([]).alias("a_str_list"),
        pl.col("Json").struct.field("a_struct").fill_null(pl.lit(None)).alias("a_struct"),
    ).alias("Json")
)
print_df(df_res)

-------------------- schema --------------------


-------------------- df records --------------------


-------------------- schema --------------------


-------------------- df records --------------------


### Deserialize 时找不到值的问题

Deserialize 的本质就是从 DynamoDB JSON 中提取值, 并对根据数据类型其进行转化. 但有的时候就会出现你无法提取值的情况. 本节就来好好研究在各种情况下 polars 的行为, 这样我们在实现 deserializer 的时候就能合理规避这些问题.

在这个例子中, polars 通过 infer schema 自动推断出 a_int 的 schema 是 ``Struct({"N": Int64(), "NULL": "Boolean"})``, 所以我们的 ``Item.a_int.N`` 在 a_int 没有值时候能获得 None.


In [73]:
df = pl.DataFrame(
    [
        {"Item": {"a_int": {"N": "1"}}},
        {"Item": {"a_int": {"NULL": True}}},
    ],
)
print_df(df)

df_res = df.select(
    pl.col("Item").struct.field("a_int").struct.field("N").cast(pl.Int64).alias("a_int")
)
print_df(df_res)

-------------------- schema --------------------


-------------------- df records --------------------


-------------------- schema --------------------


-------------------- df records --------------------


在这个例子中就不行了. polars 无法访问没有在 schema 中被定义的值.

In [74]:
df = pl.DataFrame(
    [
        {"Json": {"a_int": {"NULL": True}}},
    ],
)
print_df(df)

try:
    df_res = df.select(
        pl.col("Json").struct.field("a_int").struct.field("N").cast(pl.Int64).alias("a_int")
    )
    print_df(df_res)
except Exception as e:
    print(repr(e))

-------------------- schema --------------------


-------------------- df records --------------------


StructFieldNotFoundError('N')


正确的做法是显式地定义 schema, 这样 polars 就能正确地访问到值了.

In [75]:
df = pl.DataFrame(
    [
        {"Json": {"a_int": {"NULL": True}}},
    ],
    schema={
        "Json": pl.Struct({
            "a_int": pl.Struct({
                "N": pl.Utf8(),
            }),
        }),
    },
)
print_df(df)

df_res = df.select(
    pl.col("Json").struct.field("a_int").struct.field("N").cast(pl.Int64).alias("a_int")
)
print_df(df_res)

-------------------- schema --------------------


-------------------- df records --------------------


-------------------- schema --------------------


-------------------- df records --------------------


## Deserialize (DynamoDB JSON to Python Dict)

### Deserialize a List

In [76]:
df = pl.DataFrame(
    [
        {"Item": {"a_int_list": [{"N": "1"}, {"N": "2"}, {"N": "3"}]}},
        {"Item": {"a_int_list": [{"NULL": True}, {"NULL": True}, {"NULL": True}]}},
        {"Item": {"a_int_list": {"NULL": True}}},
    ],
    schema={
        "Item": pl.Struct({"a_int_list": pl.List(pl.Struct({"N": pl.Utf8()}))}),
    }
)
print_df(df)

-------------------- schema --------------------


-------------------- df records --------------------


In [77]:
df_res = df.with_columns(
    pl.col("Item").struct.field("a_int_list").list.eval(
        pl.element().struct.field("N")
    ).alias("a_int_list")
).drop("Item")
print_df(df_res)

-------------------- schema --------------------


-------------------- df records --------------------


### Deserialize a Struct

In [78]:
df = pl.DataFrame(
    [
        {"Item": {"a_struct": {"M": {"id": {"N": "1"}, "name": {"S": "Alice"}}}}},
        {"Item": {"a_struct": {"M": {"id": {"Null": True}, "name": {"Null": True}}}}},
        {"Item": {"a_struct": {"Null": True}}},
    ],
    schema={
        "Item": pl.Struct({
            "a_struct": pl.Struct({
                "M": pl.Struct({
                    "id": pl.Struct({"N": pl.Utf8()}),
                    "name": pl.Struct({"S": pl.Utf8()})
                })
            })
        }),
    }
)
print_df(df)

-------------------- schema --------------------


-------------------- df records --------------------


In [79]:
df_res = df.with_columns(
    pl.struct(
        pl.col("Item").struct.field("a_struct").struct.field("M").struct.field("id").struct.field("N").cast(
            pl.Int64).alias("id"),
        pl.col("Item").struct.field("a_struct").struct.field("M").struct.field("name").struct.field("S").alias("name"),
    ).alias("a_struct"),
).drop("Item")
print_df(df_res)

-------------------- schema --------------------


-------------------- df records --------------------


## Serialize (Python Dict to DynamoDB JSON)

### Serialize Generic Python Object

Integer, Float, String, Binary, Bool, Null. 


In [80]:
df = pl.DataFrame(
    [
        {
            "Data": {
                "a_int": 123,
                "a_float": 1.23,
                "a_str": "Alice",
                "a_bin": b"hello",
                "a_bool": False,
                "a_null": None,
            }
        },
        {
            "Data": {
                "a_int": None,
                "a_float": None,
                "a_str": None,
                "a_bin": None,
                "a_bool": None,
                "a_null": None,
            }
        },
    ],
    schema={
        "Data": pl.Struct(
            {
                "a_int": pl.Int64(),
                "a_float": pl.Float64(),
                "a_str": pl.Utf8(),
                "a_bin": pl.Binary(),
                "a_bool": pl.Boolean(),
                "a_null": pl.Null(),
            }
        ),
    },
    strict=False,
)
print(df.shape)
print_df(df)

df_res = df.with_columns(
    pl.struct(
        pl.col("Data").struct.field("a_int").fill_null(-999).cast(pl.Utf8).alias("N"),
    ).alias("a_int"),
    pl.struct(
        pl.col("Data").struct.field("a_float").fill_null(-999.999).cast(pl.Utf8).alias("N"),
    ).alias("a_float"),
    pl.struct(
        pl.col("Data").struct.field("a_str").fill_null("NA").cast(pl.Utf8).alias("S"),
    ).alias("a_str"),
    pl.struct(
        pl.col("Data").struct.field("a_bin").fill_null(b"NA").bin.encode("base64").cast(pl.Utf8).alias("B"),
    ).alias("a_bin"),
    pl.struct(
        pl.col("Data").struct.field("a_bool").fill_null(False).alias("BOOL"),
    ).alias("a_bool"),
    pl.struct(
        pl.lit(True).alias("NULL"),
    ).alias("a_null"),
).drop("Data")
print_df(df_res)

(2, 1)
-------------------- schema --------------------


-------------------- df records --------------------


-------------------- schema --------------------


-------------------- df records --------------------


### Serialize a List of Integer

In [81]:
df = pl.DataFrame(
    [
        {"Data": {"a_int_list": [1, 2, 3]}},
        {"Data": {"a_int_list": [None, None, None]}},
        {"Data": {"a_int_list": None}},
    ],
    schema={
        "Data": pl.Struct({"a_int_list": pl.List(pl.Int64())}),
    }
)
print_df(df)

df_res = df.with_columns(
    pl.struct(
        pl.col("Data").struct.field("a_int_list").fill_null([]).list.eval(
            pl.struct(
                pl.element().fill_null(-999).cast(pl.Utf8()).alias("N")
            )
        ).alias("L")
    ).alias("a_int_list")
).drop("Data")
print_df(df_res)

-------------------- schema --------------------


-------------------- df records --------------------


-------------------- schema --------------------


-------------------- df records --------------------


### Serialize a List of Null

In [82]:
df = pl.DataFrame(
    [
        {"Data": {"a_null_list": [None, None, None]}},
        # {"Data": {"a_null_list": None}},
    ],
    schema={
        "Data": pl.Struct({"a_null_list": pl.List(pl.Null())}),
    }
)
print_df(df)

df_res = df.with_columns(
    pl.struct(
        pl.col("Data").struct.field("a_null_list").fill_null([]).list.eval(
            pl.struct(pl.element().fill_null(value=True).alias("NULL"))
        ).alias("L")
    ).alias("a_null_list")
).drop("Data")
print_df(df_res)

-------------------- schema --------------------


-------------------- df records --------------------


-------------------- schema --------------------


-------------------- df records --------------------


### Serialize a Struct

In [83]:
df = pl.DataFrame(
    [
        {"Data": {"a_struct": {"id": 1, "name": "Alice"}}},
        {"Data": {"a_struct": {"id": None, "name": None}}},
        {"Data": {"a_struct": None}},
    ],
    schema={
        "Data": pl.Struct({"a_struct": pl.Struct({"id": pl.Int64(), "name": pl.Utf8()})}),
    }
)
print_df(df)

df_res = df.with_columns(
    pl.struct(
        pl.struct(
            pl.struct(
                pl.col("Data").struct.field("a_struct").struct.field("id").fill_null(-999).cast(pl.Utf8).alias("N")
            ).alias("id"),
            pl.struct(
                pl.col("Data").struct.field("a_struct").struct.field("name").fill_null("NA").alias("S")
            ).alias("name"),
        ).alias("M")
    ).alias("a_struct")
).drop("Data")
print_df(df_res)

-------------------- schema --------------------


-------------------- df records --------------------


-------------------- schema --------------------


-------------------- df records --------------------


### Serialize a Struct with List

In [84]:
df = pl.DataFrame(
    [
        {"Data": {"a_struct": {"a_int_list": [1, 2, 3]}}},
        {"Data": {"a_struct": {"a_int_list": [1, None, 3]}}},
        {"Data": {"a_struct": {"a_int_list": [None, None, None]}}},
        {"Data": {"a_struct": {"a_int_list": None}}},
        {"Data": {"a_struct": None}},
    ],
    schema={
        "Data": pl.Struct({"a_struct": pl.Struct({"a_int_list": pl.List(pl.Int64())})}),
    }
)
print_df(df)

df_res = df.with_columns(
    pl.struct(
        pl.struct(
            pl.struct(
                pl.col("Data").struct.field("a_struct").struct.field("a_int_list").fill_null([]).list.eval(
                    pl.struct(
                        pl.element().fill_null(-999).cast(pl.Utf8()).alias("N")
                    )
                )
            ).alias("a_int_list"),
        ).alias("M")
    ).alias("a_struct")
).drop("Data")
print_df(df_res)

-------------------- schema --------------------


-------------------- df records --------------------


-------------------- schema --------------------


-------------------- df records --------------------
