## Setup

In [1]:
from pathlib import Path
import duckdb
from dcpy.utils import duckdb as dcpduckdb
import pandas as pd

pd.set_option("display.max_columns", 50)

In [2]:
DB_PATH = Path("ztl.db")

In [None]:
# delete the database if it exists
DB_PATH.unlink(missing_ok=True)

# create the database with extensions and credentials
with duckdb.connect(str(DB_PATH)) as connection:
    connection.sql(f"INSTALL spatial")
    connection.sql(f"LOAD spatial")


In [None]:
# test
with duckdb.connect(str(DB_PATH)) as connection:
    dcpduckdb.setup_s3_secret(DB_PATH)
    connection.sql(
        "DESCRIBE TABLE 's3://edm-recipes/datasets/dof_dtm/20241122/dof_dtm.parquet'"
    ).show()

## `dof_dtm`

In [None]:
with duckdb.connect(str(DB_PATH)) as connection:
    dcpduckdb.setup_s3_secret(DB_PATH)
    connection.sql(
        """
            create table dof_dtm_old as
            select * from
            read_parquet(
                's3://edm-recipes/datasets/dof_dtm/20241122/dof_dtm.parquet',
                filename = true
            )
        """
    )

In [None]:
with duckdb.connect(str(DB_PATH)) as connection:
    dcpduckdb.setup_s3_secret(DB_PATH)
    connection.sql(
        """
            create table dof_dtm_new as
            select * from
            read_parquet(
                's3://edm-recipes/datasets/dof_dtm/20250110/dof_dtm.parquet',
                filename = true
            )
        """
    )

In [None]:
with duckdb.connect(str(DB_PATH)) as connection:
    connection.sql("SHOW ALL TABLES").show()

In [None]:
with duckdb.connect(str(DB_PATH)) as connection:
    connection.sql("describe table dof_dtm_old").show(max_rows=100)

In [None]:
with duckdb.connect(str(DB_PATH)) as connection:
    connection.sql("select * from dof_dtm_old").show()

In [None]:
with duckdb.connect(str(DB_PATH)) as connection:
    connection.sql("select * from dof_dtm_new").show()

In [None]:
with duckdb.connect(str(DB_PATH)) as connection:
    dof_dtm_old = connection.sql(
        "select bbl, wkb_geometry from dof_dtm_old order by bbl asc"
    ).df()
dof_dtm_old

In [None]:
with duckdb.connect(str(DB_PATH)) as connection:
    dof_dtm_new = connection.sql(
        "select bbl, wkb_geometry from dof_dtm_new order by bbl asc"
    ).df()
dof_dtm_new

In [11]:
old_data_raw = dof_dtm_old
new_data_raw = dof_dtm_new
INDEX_COLUMN = "bbl"

In [12]:
def compare_data(old_data: pd.DataFrame, new_data: pd.DataFrame) -> None:
    print("PREVIEW OLD DATA")
    print(old_data.head().to_markdown())
    old_data.info()
    print("PREVIEW NEW DATA")
    new_data.info()
    print(new_data.head().to_markdown())

    compare_result = old_data.compare(new_data, align_axis=0, keep_equal=True)
    rows_with_diff_count = len(compare_result) // 2

    if rows_with_diff_count == 0:
        print(f"Files are identical ({len(old_data)} rows)")
    else:
        print(
            f"""
            Files aren't identical in
                {rows_with_diff_count:,} rows out of
                {len(old_data):,} rows in old data
                {len(new_data):,} rows in new data
            """
        )
        compare_result = compare_result.set_index(
            compare_result.index.set_names(["old_data", "new_data"], level=1)
        )

In [13]:
def show_data_with_unique_indices(
    old_data: pd.DataFrame,
    new_data: pd.DataFrame,
    index_column: str,
) -> None:
    unique_indices_from_old = set(old_data[index_column].unique()).difference(
        set(new_data[index_column].unique())
    )
    print("Unique rows in OLD DATA")
    print(old_data[old_data[index_column].isin(unique_indices_from_old)].to_markdown())
    unique_indices_from_new = set(new_data[index_column].unique()).difference(
        set(old_data[index_column].unique())
    )
    print("Unique rows in NEW DATA")
    print(new_data[new_data[index_column].isin(unique_indices_from_new)].to_markdown())

In [14]:
def limit_rows_to_compare(
    old_data: pd.DataFrame,
    new_data: pd.DataFrame,
    index_column: str,
    common_indices: set,
) -> tuple[pd.DataFrame, pd.DataFrame]:
    columns_to_use = old_data.columns
    if len(old_data.columns) > len(new_data.columns):
        print("!! old data has more columns")
        columns_to_use = new_data.columns
    elif len(new_data.columns) > len(old_data.columns):
        print("!! new data has more columns")
        columns_to_use = old_data.columns

    old_data = old_data[columns_to_use]
    new_data = new_data[columns_to_use]
    columns_to_sort_by = columns_to_use.to_list()

    old_data_limited = (
        old_data[old_data[index_column].isin(common_indices)]
        .sort_values(by=columns_to_sort_by)
        .reset_index(drop=True)
    )
    new_data_limited = (
        new_data[new_data[index_column].isin(common_indices)]
        .sort_values(by=columns_to_sort_by)
        .reset_index(drop=True)
    )

    return old_data_limited, new_data_limited


In [None]:
if len(old_data_raw) == len(new_data_raw):
      compare_data(old_data=old_data_raw, new_data=new_data_raw)
else:
    print("WARNING! Can only compare data of the same length and indices!")
    print(
        f"""
            {len(old_data_raw):,} rows in old data
            {len(new_data_raw):,} rows in new data
        """
    )
    print("detail differences ...")
    show_data_with_unique_indices(old_data_raw, new_data_raw, INDEX_COLUMN)
    common_indices = set(old_data_raw[INDEX_COLUMN].unique()).intersection(
        set(new_data_raw[INDEX_COLUMN].unique())
    )

    print("compare common rows ...")
    print(f"{len(common_indices)} common index values")
    old_data_to_compare, new_data_to_compare = limit_rows_to_compare(
        old_data_raw, new_data_raw, INDEX_COLUMN, common_indices
    )
    compare_data(old_data_to_compare, new_data_to_compare)