In [1]:

%load_ext autoreload
%autoreload 2

In [3]:
import os
from glob import glob
from typing import List

import dotenv
import opendal
import polars as pl
import pyarrow as pa
import polars.selectors as s
from pyarrow import fs
from pyarrow.dataset import dataset
from deltalake import DeltaTable
pl.Config().set_tbl_cols(-1)

# from src.helpers.utils import create_date_directories
# from src.scripts.io import sink_to_s3



polars.config.Config

In [3]:
# Example usage
base_path = "../data/log_content/"
s3_path = "/data/log_content"
start_date = "20220401"
end_date = "20220430"
directory_paths = create_date_directories(base_path, start_date, end_date)
s3_paths = create_date_directories(s3_path, start_date, end_date)

In [None]:
def ingest_log():
    dotenv.load_dotenv()
    access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
    secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")
    region = os.getenv("AWS_REGION")
    op = opendal.Operator(
        "s3",
        root='/log_content/',
        bucket='data',
        region=region,
        endpoint='http://127.0.0.1:9000',
        access_key_id=access_key_id,
        secret_access_key=secret_access_key,
    )

    return op

In [4]:

def ingest_by_pyarrow(paths: str):
    dotenv.load_dotenv()
    cloudfs = fs.S3FileSystem(
        access_key=os.getenv("AWS_ACCESS_KEY_ID"),
        secret_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
        region=os.getenv("AWS_REGION"),
        endpoint_override='http://127.0.0.1:9000',
    )
    schema = pa.schema([
        pa.field('_index', pa.string()),
        pa.field('_type', pa.string()),
        pa.field('_id', pa.string()),
        pa.field('_score', pa.int64()),
        pa.field('_source',
                 pa.struct(
                     [
                         pa.field('Contract', pa.string()),
                         pa.field('Mac', pa.string()),
                         pa.field('TotalDuration', pa.int64()),
                         pa.field('AppName', pa.string()),
                     ]
                 )
                 )
    ]
    )
    s3_files = [
        entry.path
        for entry in cloudfs.get_file_info(fs.FileSelector(paths, recursive=True))
    ]

    df = []

    for path in s3_files:
        ds = dataset(
            source=path,
            schema=schema,
            filesystem=cloudfs,
            format='json',
        )
        df.append(
            pl.scan_pyarrow_dataset(ds)
            .with_columns(pl.Series("Date", [path]).str.extract(r"\d{8}", 0))
            .select(
                pl.col("Date").str.to_date("%Y %m %d"),
                pl.col("_index").alias("Index"),
                pl.col("_type").alias("Type"),
                pl.col("_id").alias("Id"),
                pl.col("_score").alias("Score"),
                pl.col("_source"),
            )
            .unnest("_source")
        )

    return pl.concat(df)


In [7]:
sources = (ingest_by_pyarrow("data/log_content/")
           .filter(
    pl.col("Date").is_between(pl.datetime(2022, 4, 1), pl.datetime(2022, 4, 3))
)
           .select(pl.col("Date"), pl.col("TotalDuration")))
sources.schema

OrderedDict([('Date', Date), ('TotalDuration', Int64)])

In [9]:
sink_to_s3(
    sources,
    path="data/log_content.parquet",
    partition_cols=["Date"],
    existing_data_behavior='delete_matching',
)

In [None]:

def ingest_by_pyarrow_batches(paths: List[str] = None):
    dotenv.load_dotenv()
    cloudfs = fs.S3FileSystem(
        access_key=os.getenv("AWS_ACCESS_KEY_ID"),
        secret_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
        region=os.getenv("AWS_REGION"),
        endpoint_override='http://127.0.0.1:9000',
    )

    schema = pa.schema([
        pa.field('_index', pa.string()),
        pa.field('_type', pa.string()),
        pa.field('_id', pa.string()),
        pa.field('_score', pa.int64()),
        pa.field('_source',
                 pa.struct(
                     [
                         pa.field('Contract', pa.string()),
                         pa.field('Mac', pa.string()),
                         pa.field('TotalDuration', pa.int64()),
                         pa.field('AppName', pa.string()),
                     ]
                 )
                 )
    ]
    )
    df = []

    for path in paths:
        ds = dataset(
            source=path,
            schema=schema,
            filesystem=cloudfs,
            format='json',
        )
        for batch in ds.to_batches():
            df.append(
                pl.from_arrow(batch)
                .lazy()
                .with_columns(pl.Series("Date", [path]).str.extract(r"\d{8}", 0))
            )

    return pl.concat(df)


In [None]:
ingest_by_pyarrow_batches(s3_paths)

# Ingest the logging data

In [4]:
def get_log_json(paths: str | list[str]) -> pl.LazyFrame:
    """
    Function to ingest the logging json data, get the filename and add a "Date" column
    Args:
        paths (str | list[str]): a list of path to data, this path must be in glob pattern

    Returns:
        pl.LazyFame
    """
    if isinstance(paths, str):
        paths = [paths]
    # Define schema of logging data
    schema = {
        "_index" : pl.String,
        "_type"  : pl.String,
        "_id"    : pl.String,
        "_score" : pl.Int64,
        "_source": pl.Struct(
            [
                pl.Field("Contract", pl.String),
                pl.Field("Mac", pl.String),
                pl.Field("TotalDuration", pl.Int64),
                pl.Field("AppName", pl.String),
            ]
        ),
    }

    def _scan_log(path, schema):
        return (
            pl.scan_ndjson(path, schema=schema, low_memory=True)
            .with_columns(pl.Series("Date", [path]).str.extract(r"\d{8}", 0))
            .select(
                pl.col("Date").str.to_date("%Y %m %d"),
                pl.col("_index").alias("Index"),
                pl.col("_type").alias("Type"),
                pl.col("_id").alias("Id"),
                pl.col("_score").alias("Score"),
                pl.col("_source"),
            )
            .unnest("_source")
        )

    dfs = []
    # Function to scan and preprocess a single log data
    try:
        dfs = [_scan_log(path, schema) for path in paths]
    except Exception as e:
        print(f"Failed to read data from {paths}", {e})

    # Concatenate the processed lazyframes
    return pl.concat(dfs, how="vertical").lazy()

In [5]:
sources = get_log_json(directory_paths)

In [None]:
pl.concat(
    pl.collect_all(
        [
            dfi.lazy()
            for dfi in sources.collect().partition_by(by="Date")
        ]
    )
)

# Get the RFM table

In [6]:
sources.columns

['Date',
 'Index',
 'Type',
 'Id',
 'Score',
 'Contract',
 'Mac',
 'TotalDuration',
 'AppName']

In [15]:
reported_date = "20220501"


def get_rfm_table(sources: pl.LazyFrame, reported_date: str = "20220501", total_date: int = 30) -> pl.LazyFrame:
    b: pl.LazyFrame = (
        sources.group_by("Contract")
        .agg(
            pl.col("Date").max().alias("LatestDate")
        )
    )

    temp = (
        sources.with_columns(
            pl.lit(reported_date).str.to_date("%Y %m %d").alias("ReportedDate")
        )
    )

    rfm = (
        temp
        .filter(pl.col("Contract").str.len_chars() > 1)
        .join(b, on="Contract", how='left')
        .group_by("Contract")
        .agg(
            (pl.col("ReportedDate") - pl.col("LatestDate")).min().alias("Recency"),
            (pl.col("Date").n_unique() / pl.lit(total_date) * 100.0).round(2).alias("Frequency"),
            pl.sum("TotalDuration").alias("Monetary"),
        )
        .with_columns(
            pl.col("Recency").qcut(3, labels=["1", "2", "3"], allow_duplicates=True).alias("R"),
            pl.col("Frequency").qcut(3, labels=["1", "2", "3"], allow_duplicates=True).alias("F"),
            pl.col("Monetary").qcut(3, labels=["1", "2", "3"], allow_duplicates=True).alias("M"),
        )
        .select(
            pl.col("Contract"),
            pl.concat_str([pl.col("R"), pl.col("F"), pl.col("M")]).alias("RFM"),
            pl.lit(reported_date).str.strptime(pl.Date, format="%Y%m%d").alias("ReportedDate")
        )
        .sort("RFM")
    )

    return rfm



In [16]:
get_rfm_table(sources).fetch(100)

RFM,ReportedDate
str,date
"""111""",2022-05-01
"""111""",2022-05-01
"""111""",2022-05-01
"""111""",2022-05-01
"""111""",2022-05-01
…,…
"""313""",2022-05-01
"""313""",2022-05-01
"""313""",2022-05-01
"""313""",2022-05-01


In [None]:
app_names = [
    "CHANNEL",
    "KPLUS",
    "VOD",
    "FIMS",
    "BHD",
    "SPORT",
    "CHILD",
    "RELAX",
]

column_names = [
    "TVDuration",
    "TVDuration",
    "MovieDuration",
    "MovieDuration",
    "MovieDuration",
    "SportDuration",
    "ChildDuration",
    "RelaxDuration",
]


def get_pivot_data(sources: pl.LazyFrame, app_names: List[str], column_names: List[str]) -> pl.LazyFrame:
    if not isinstance(sources, pl.LazyFrame):
        sources = sources.lazy()

    if len(app_names) != len(column_names):
        raise ValueError("The lengths of app_names and column_names must be the same")

    mapping = dict(zip(app_names, column_names))
    pivot_df: pl.LazyFrame = (
        sources.select(
            pl.col("Contract"),
            pl.col("TotalDuration"),
            pl.col("AppName").replace(mapping, default="Unknown").alias("Type")
        )
        .filter(
            (pl.col("Contract").str.len_chars() > 1)
            & (pl.col("Type") != "Unknown")
            & (pl.col("TotalDuration") > 0)
        )
        .group_by(["Contract"])
        .agg(
            [
                pl.when(pl.col("Type") == y).then(pl.col("TotalDuration")).sum().alias(y)
                for y in set(column_names)
            ]
        )
        .sort(["Contract", "TVDuration"])
    )

    return pivot_df

In [None]:
def get_most_watch(sources: pl.LazyFrame) -> pl.LazyFrame:
    if not isinstance(sources, pl.LazyFrame):
        sources = sources.lazy()
    columns = sources.columns[1:]
    watch_type = [item[:-8] for item in columns]

    return (
        sources
        .with_columns(
            pl.concat_list([
                pl.struct(pl.col(c).alias("l"), pl.lit(v).alias("k")) for c, v in zip(columns, watch_type)
                for c, v in zip(columns, watch_type)
            ]
            ).alias('temp')
        )
        .select(
            pl.col('Contract'),
            pl.col('temp').list.sort(descending=True).list.first().struct.field('k').alias('MostWatch')
        )
    )


In [None]:
pivot_df = get_pivot_data(sources, app_names, column_names)

In [None]:
pivot_df.fetch(100)

In [None]:
get_most_watch(pivot_df).fetch(100)

In [None]:
(pivot_df.with_columns(
    pl.concat_list([pl.struct(pl.col(c).alias("l"), pl.lit(v).alias("k")) for c, v in zip(columns, watch_type)]).alias(
        'temp'
    )
).select(
    pl.col('Contract'),
    pl.col('temp').list.sort(descending=True).list.first().struct.field('k')
)
 .fetch())


In [None]:
columns

In [None]:
original_list = ['MovieDuration', 'TVDuration', 'RelaxDuration', 'ChildDuration', 'SportDuration']
filtered_list = [item for item in original_list if 'Duration' not in item]
filtered_list

# SCP Type 2 Supported Functions

In [1]:
from src.scripts.support import type2_scd_upsert_pl
import polars as pl

In [19]:
# test data 
sources_df = pl.DataFrame([
    {"pkey"    : 1, "attr1": "A", "attr2": "A", "is_current": True, "effective_time": "2019-01-01 00:00:00",
     "end_time": None},
    {"pkey"    : 2, "attr1": "B", "attr2": "B", "is_current": True, "effective_time": "2019-01-01 00:00:00",
     "end_time": None},
    {"pkey"    : 4, "attr1": "D", "attr2": "D", "is_current": True, "effective_time": "2019-01-01 00:00:00",
     "end_time": None},
]
)
updates_df = pl.DataFrame(
    [
        {"pkey": 2, "attr1": "Z", "attr2": "null", "effective_time": "2020-01-01 00:00:00"},
        {"pkey": 3, "attr1": "C", "attr2": "C", "effective_time": "2020-09-15 00:00:00"},
    ]
)

In [3]:
print(sources_df)
sources_df.cast({"end_time": pl.Datetime}).write_delta(target='./delta_lake1/product', mode="overwrite")

shape: (3, 6)
┌──────┬───────┬───────┬────────────┬─────────────────────┬──────────┐
│ pkey ┆ attr1 ┆ attr2 ┆ is_current ┆ effective_time      ┆ end_time │
│ ---  ┆ ---   ┆ ---   ┆ ---        ┆ ---                 ┆ ---      │
│ i64  ┆ str   ┆ str   ┆ bool       ┆ str                 ┆ null     │
╞══════╪═══════╪═══════╪════════════╪═════════════════════╪══════════╡
│ 1    ┆ A     ┆ A     ┆ true       ┆ 2019-01-01 00:00:00 ┆ null     │
│ 2    ┆ B     ┆ B     ┆ true       ┆ 2019-01-01 00:00:00 ┆ null     │
│ 4    ┆ D     ┆ D     ┆ true       ┆ 2019-01-01 00:00:00 ┆ null     │
└──────┴───────┴───────┴────────────┴─────────────────────┴──────────┘


In [20]:
updates_df = updates_df.with_columns(pl.col('effective_time').str.to_datetime())

In [9]:
print(updates_df)

shape: (2, 1)
┌─────────────────────┐
│ effective_time      │
│ ---                 │
│ datetime[μs]        │
╞═════════════════════╡
│ 2020-01-01 00:00:00 │
│ 2020-09-15 00:00:00 │
└─────────────────────┘


In [15]:
sources1_df = pl.scan_delta("./delta_lake1/product").with_columns(
    pl.col('effective_time').str.to_datetime(),
    pl.col('end_time').str.to_datetime(),
)

In [16]:
print(sources1_df.collect())


shape: (3, 6)
┌──────┬───────┬───────┬────────────┬─────────────────────┬──────────────┐
│ pkey ┆ attr1 ┆ attr2 ┆ is_current ┆ effective_time      ┆ end_time     │
│ ---  ┆ ---   ┆ ---   ┆ ---        ┆ ---                 ┆ ---          │
│ i64  ┆ str   ┆ str   ┆ bool       ┆ datetime[μs]        ┆ datetime[μs] │
╞══════╪═══════╪═══════╪════════════╪═════════════════════╪══════════════╡
│ 1    ┆ A     ┆ A     ┆ true       ┆ 2019-01-01 00:00:00 ┆ null         │
│ 2    ┆ B     ┆ B     ┆ true       ┆ 2019-01-01 00:00:00 ┆ null         │
│ 4    ┆ D     ┆ D     ┆ true       ┆ 2019-01-01 00:00:00 ┆ null         │
└──────┴───────┴───────┴────────────┴─────────────────────┴──────────────┘


In [21]:
type2_scd_upsert_pl(
    sources1_df,
    updates_df.lazy(),
    'pkey',
    "./delta_lake1/product",
    ['attr1', 'attr2'],
)

{'num_source_rows': 3,
 'num_target_rows_inserted': 1,
 'num_target_rows_updated': 2,
 'num_target_rows_deleted': 0,
 'num_target_rows_copied': 2,
 'num_output_rows': 5,
 'num_target_files_added': 2,
 'num_target_files_removed': 1,
 'execution_time_ms': 203,
 'scan_time_ms': 0,
 'rewrite_time_ms': 27}

In [21]:
# new record
new_records = updates_df.lazy().join(sources_df.lazy(), on="pkey", how="anti").with_columns(pl.lit(True).alias("is_current"), pl.lit(None).alias("end_time").cast(pl.Datetime))
print(new_records)


naive plan: (run LazyFrame.explain(optimized=True) to see the optimized plan)

 WITH_COLUMNS:
 [true.alias("is_current"), null.alias("end_time").strict_cast(Datetime(Microseconds, None))]
  ANTI JOIN:
  LEFT PLAN ON: [col("pkey")]
    DF ["pkey", "attr1", "attr2", "effective_time"]; PROJECT */4 COLUMNS; SELECTION: "None"
  RIGHT PLAN ON: [col("pkey")]
    DF ["pkey", "attr1", "attr2", "is_current"]; PROJECT */6 COLUMNS; SELECTION: "None"
  END ANTI JOIN


In [25]:
print(new_records.collect().cast({"effective_time": pl.Datetime}))


ComputeError: conversion from `str` to `datetime[μs]` failed in column 'effective_time' for 1 out of 1 values: ["2020-09-15 00:00:00"]

You might want to try:
- setting `strict=False` to set values that cannot be converted to `null`
- using `str.strptime`, `str.to_date`, or `str.to_datetime` and providing a format string

In [20]:
# deleted records
(
    sources_df
    .filter(
        pl.col("is_current") == "True"
    )
    .join(
        other=updates_df,
        on="pkey",
        how="anti"
    )
)

pkey,attr1,attr2,is_current,effective_time,end_time
i64,str,str,bool,str,null


In [23]:
[(pl.col(f"{attr}") != pl.col(f"{attr}_right")) for attr in ['attr1', 'attr2']]

[<Expr ['[(col("attr1")) != (col("attr1…'] at 0x12423C190>,
 <Expr ['[(col("attr2")) != (col("attr2…'] at 0x12423FF40>]

In [43]:
opened_records = (sources_df
.join(updates_df, on="pkey", how="inner")
.filter(
    pl.any_horizontal(
        (pl.col("is_current") == "True"),
        *[(pl.col(f"{attr}") != pl.col(f"{attr}_right")) for attr in ['attr1', 'attr2']]
    )
)
.select(
    pl.col("pkey"),
    *[(pl.col(f"{attr}_right").alias(f"{attr}")) for attr in ['attr1', 'attr2']],
    pl.col("effective_time_right").alias("effective_time"),
    pl.lit(True).alias("is_current"),
    pl.lit(None).alias("end_time").cast(pl.String)
)
)
print(opened_records)

shape: (1, 6)
┌──────┬───────┬───────┬─────────────────────┬────────────┬──────────┐
│ pkey ┆ attr1 ┆ attr2 ┆ effective_time      ┆ is_current ┆ end_time │
│ ---  ┆ ---   ┆ ---   ┆ ---                 ┆ ---        ┆ ---      │
│ i64  ┆ str   ┆ str   ┆ str                 ┆ bool       ┆ str      │
╞══════╪═══════╪═══════╪═════════════════════╪════════════╪══════════╡
│ 2    ┆ Z     ┆ null  ┆ 2020-01-01 00:00:00 ┆ true       ┆ null     │
└──────┴───────┴───────┴─────────────────────┴────────────┴──────────┘


In [31]:
closed_records = (sources_df
.join(updates_df, on="pkey", how="inner")
.filter(
    pl.any_horizontal(
        (pl.col("is_current") == "True"),
        *[(pl.col(f"{attr}") != pl.col(f"{attr}_right")) for attr in ['attr1', 'attr2']]
    )
)
.select(
    pl.col("pkey"),
    *[(pl.col(f"{attr}").alias(f"{attr}")) for attr in ['attr1', 'attr2']],
    pl.lit(False).alias("is_current"),
    pl.col("effective_time"),
    pl.col("effective_time_right").alias("end_time"),
)
)
print(closed_records)

shape: (1, 6)
┌──────┬───────┬───────┬────────────┬─────────────────────┬─────────────────────┐
│ pkey ┆ attr1 ┆ attr2 ┆ is_current ┆ effective_time      ┆ end_time            │
│ ---  ┆ ---   ┆ ---   ┆ ---        ┆ ---                 ┆ ---                 │
│ i64  ┆ str   ┆ str   ┆ bool       ┆ str                 ┆ str                 │
╞══════╪═══════╪═══════╪════════════╪═════════════════════╪═════════════════════╡
│ 2    ┆ B     ┆ B     ┆ false      ┆ 2019-01-01 00:00:00 ┆ 2020-01-01 00:00:00 │
└──────┴───────┴───────┴────────────┴─────────────────────┴─────────────────────┘


In [50]:
# insert records
upsert_records = pl.concat([
    opened_records,
    new_records,
    closed_records,
],
how="align"
)

In [52]:
upsert_records.write_delta(
    target='./delta_lake1/product',
    mode='merge',
    delta_merge_options={
        'predicate': 'source.pkey = target.pkey',
        'source_alias': 'source',
        'target_alias' : 'target',
    }
).when_matched_update_all().when_not_matched_insert_all().execute()

new_df = pl.read_delta(source='./delta_lake1/product').sort('pkey')

print(new_df)


shape: (5, 6)
┌──────┬───────┬───────┬────────────┬─────────────────────┬─────────────────────┐
│ pkey ┆ attr1 ┆ attr2 ┆ is_current ┆ effective_time      ┆ end_time            │
│ ---  ┆ ---   ┆ ---   ┆ ---        ┆ ---                 ┆ ---                 │
│ i64  ┆ str   ┆ str   ┆ bool       ┆ str                 ┆ str                 │
╞══════╪═══════╪═══════╪════════════╪═════════════════════╪═════════════════════╡
│ 1    ┆ A     ┆ A     ┆ true       ┆ 2019-01-01 00:00:00 ┆ null                │
│ 2    ┆ B     ┆ B     ┆ false      ┆ 2019-01-01 00:00:00 ┆ 2020-01-01 00:00:00 │
│ 2    ┆ Z     ┆ null  ┆ true       ┆ 2020-01-01 00:00:00 ┆ null                │
│ 3    ┆ C     ┆ C     ┆ true       ┆ 2020-09-15 00:00:00 ┆ null                │
│ 4    ┆ D     ┆ D     ┆ true       ┆ 2019-01-01 00:00:00 ┆ null                │
└──────┴───────┴───────┴────────────┴─────────────────────┴─────────────────────┘


In [28]:
from typing import List, Any


def type2_scd_upsert_pl(
    sources_df: pl.LazyFrame,
    updates_df: pl.LazyFrame,
    primary_key: str,
    target: str,
    attr_cols: List[str],
    is_current_col: str = "is_current",
    effective_time_col: str = "effective_time",
    end_time_col: str = "end_time",
) -> dict[str, Any]:
    """
    Perform a Type 2 Slowly Changing Dimension (SCD) upsert operation using Polars LazyFrame/DataFrame and
    write to Delta Lake using pl.write_delta().

    Args:
        sources_df (pl.LazyFrame): The source or target Polars LazyFrame scanned from DeltaLake using pl.scan_delta().
        updates_df (pl.LazyFrame): The Polars LazyFrame representing the updates data.
        primary_key (str): The name of the primary key column.
        target (str): The name of the target table to write the upserted records.
        attr_cols (List[str]): A list of attribute column names.
        is_current_col (str, optional): The name of the column indicating if a record is current. Defaults to
        "is_current".
        effective_time_col (str, optional): The name of the column indicating the effective time of a record.
        Defaults to "effective_time".
        end_time_col (str, optional): The name of the column indicating the end time of a record. Defaults to
        "end_time".

    Returns:
        dict[str, Any]: A dictionary representing the result of the upsert operation.
    """
    # validate updates delta tables
    base_cols = sources_df.columns

    required_base_cols = (
        [primary_key] + attr_cols + [is_current_col, effective_time_col, end_time_col]
    )

    if sorted(base_cols) != sorted(required_base_cols):
        raise ValueError(
            f"The base columns has to be the same as the required columns: {required_base_cols!r}"
        )
    # validate updated polars lazyframe
    update_cols = updates_df.columns

    required_update_cols = [primary_key] + attr_cols + [effective_time_col]

    if sorted(update_cols) != sorted(required_update_cols):
        raise ValueError(
            f"The base columns has to be the same as the required columns: {required_update_cols!r}"
        )

    new_records = updates_df.join(sources_df, on=primary_key, how="anti").with_columns(
        pl.lit(True).alias(is_current_col),
        pl.lit(None, pl.Datetime).alias(end_time_col),
    )
    updates_conds = [
        (pl.col(f"{attr}") != pl.col(f"{attr}_right")) for attr in attr_cols
    ]
    updates_records = sources_df.join(updates_df, on=primary_key, how="inner").filter(
        pl.any_horizontal(
            (pl.col(is_current_col) == True),
            *updates_conds,
        )
    )
    open_conds = [(pl.col(f"{attr}_right").alias(f"{attr}")) for attr in attr_cols]
    open_records = updates_records.select(
        pl.col(primary_key),
        *open_conds,
        pl.lit(True).alias(is_current_col),
        pl.col(f"{effective_time_col}_right").alias(f"{effective_time_col}"),
        pl.lit(None, pl.Datetime).alias(end_time_col),
    )

    close_conds = [(pl.col(f"{attr}").alias(f"{attr}")) for attr in attr_cols]
    close_records = updates_records.select(
        pl.col(primary_key),
        *close_conds,
        pl.lit(False).alias(is_current_col),
        pl.col(effective_time_col),
        pl.col(f"{effective_time_col}_right").alias(end_time_col),
    )

    # merging
    upsert_records = pl.concat(
        [
            new_records,
            open_records,
            close_records,
        ],
        how="align",
    )

    return (
        upsert_records.collect(streaming=True)
        .write_delta(
            target=target,
            mode="merge",
            delta_merge_options={
                "predicate": f"source.{primary_key} = target.{primary_key}",
                "source_alias": "source",
                "target_alias": "target",
            },
        )
        .when_matched_update_all()
        .when_not_matched_insert_all()
        .execute()
    )


In [20]:
from datetime import datetime

sources_df = pl.LazyFrame({
    'product_code' : ['0001', '0002', '0003', '0004'],
    'color' : ['red', 'green','blue','yellow'],
    'size': ['small','medium','large','x-large']
}).cast().with_columns(
    [
        pl.lit(True).alias('is_current'),
        pl.lit(datetime(1900,1,1,0,0,0,0)).alias('effective_time'),
        pl.lit(None, pl.Datetime).alias('end_time'),
    ]
)
sources_df.collect().write_delta(target='./delta_lake1/product1', mode="overwrite")

In [35]:
sources_df.d

[Int64, String, String, Boolean, String, Null]

In [21]:
updates_df = pl.LazyFrame({
    'product_code' : ['0002', '0003', '0004','0005'],
    'color' : ['green','teal','yellow','white'],
    'size': ['medium','large','x-large', 'medium']  
}).with_columns(
    pl.lit(datetime(1999, 1, 20, 0,0,0,0)).alias('effective_time')
)

In [22]:
updates_df.collect()

product_code,color,size,effective_time
str,str,str,datetime[μs]
"""0002""","""green""","""medium""",1999-01-20 00:00:00
"""0003""","""teal""","""large""",1999-01-20 00:00:00
"""0004""","""yellow""","""x-large""",1999-01-20 00:00:00
"""0005""","""white""","""medium""",1999-01-20 00:00:00


In [36]:
sources_dfs = pl.scan_delta(source='./delta_lake1/product1')


In [43]:
isinstance(sources_dfs.schema.get("effective_time"), pl.Datetime)

True

In [41]:
if not isinstance(sources_dfs.schema.get("effective_time"), pl.Datetime):
    raise ValueError("wrong dtypes")

In [29]:
type2_scd_upsert_pl(
    sources_dfs.lazy(),
    updates_df,
    "product_code",
    './delta_lake1/product1',
    ['color', 'size']
)

{'num_source_rows': 7,
 'num_target_rows_inserted': 0,
 'num_target_rows_updated': 13,
 'num_target_rows_deleted': 0,
 'num_target_rows_copied': 1,
 'num_output_rows': 14,
 'num_target_files_added': 1,
 'num_target_files_removed': 2,
 'execution_time_ms': 109,
 'scan_time_ms': 0,
 'rewrite_time_ms': 8}