In [None]:
import utils
import polars as pl

from constants import list_id_cols
from pathlib import Path
from collections import namedtuple

dfsource = utils.get_df(Path("input/test data.accdb"))

In [None]:
# convert col_list into nested list

col_list = [
  "TYPE1",
  "SIZE1",
  "SITE1A",
  "SITE1B",
  "SITE1C",
  "SITE1D",
  "TYPE2",
  "SIZE2",
  "SITE2A",
  "SITE2B",
  "SITE2C",
  "SITE2D",
  "TYPE3",
  "SIZE3",
  "SITE3A",
  "SITE3B",
  "SITE3C",
  "SITE3D",
  "OTHER PATHOLOGY",
  "SIZE OTHER",
  "SITE OTHER A",
  "SITE OTHER B",
  "SITE OTHER C",
  "SITE OTHER D",
]

col_list_len = len(col_list)
chunk_size = 6

assert (
  col_list_len % chunk_size
) == 0, f"len(col_list): {col_list_len} not divisible by chunk_size: {chunk_size}"

chunks = int(col_list_len / chunk_size)

# define namedtuple for lesion column names
ColMap = namedtuple("ColMap", ["id", "type", "size", "site"])

# save column names as col_map
col_map: list[ColMap] = []

for n in range(chunks):
  i = n * chunk_size  # starting index
  id = None
  if n <= 2:
    id = n + 1
  else:
    id = "other"

  col_map.append(
    ColMap(
      str(id),
      col_list[i],
      col_list[i + 1],
      [
        col_list[j] for j in range(i + 2, i + 6)
      ],  # extract 3rd to 6th element in a chunk: i+2 (inclusive) to i+6 (exclusive)
    )
  )


In [None]:
col_map

In [None]:
def _get_lesion_expr():
  def _get_member_struct_expr(i: int):
    return (
      pl.struct(
        lesion_id=pl.lit(col_map[i].id),
        type=col_map[i].type,
        size=col_map[i].size,
        site_A=col_map[i].site[0],
        site_B=col_map[i].site[1],
        site_C=col_map[i].site[2],
        site_D=col_map[i].site[3],
      ).alias(col_map[i].id)
    )

  return [
    pl.col("LESION"),
    pl.concat_list([_get_member_struct_expr(i) for i in range(chunks)]).alias(
      "lesion_list"
    ),
  ]


In [None]:
# dflesion
dflesion = (
  dfsource.select(pl.col(list_id_cols), *_get_lesion_expr())
  .explode("lesion_list")
  .unnest("lesion_list")
)

dflesion

In [None]:
dflesion.columns

In [None]:
# Rule 7: LESION vs any data filled
dflesion.with_columns(
  pl.when(pl.any_horizontal(pl.col(["type", "size", "site_A", "site_B", "site_C", "site_D"]).is_not_null()))
  .then(True)
  .otherwise(False)
  .alias("R7_lesion_filled")
).filter(pl.col("LESION") != pl.col("R7_lesion_filled"))

# replace not applicable with nulls?


In [None]:
# attempt to replace N/A with nulls
null_values = ["0 - not applicable", "00 = not applicable"]

dflesion_2 = dflesion.with_columns(
  pl.col(["type", "size"]).replace("0 - not applicable", None),
  pl.col(["site_A", "site_B", "site_C", "site_D"]).replace("00 = not applicable", None),
).with_columns(
  pl.when(
    pl.any_horizontal(
      pl.col(["type", "size", "site_A", "site_B", "site_C", "site_D"]).is_not_null()
    )
  )
  .then(1)
  .otherwise(0)
  .alias("lesion_filled")
)

dflesion_2.group_by(*list_id_cols, "LESION").agg(pl.col("lesion_filled").sum()).filter(
  ((pl.col("LESION") == True) & (pl.col("lesion_filled") == 0))
  | ((pl.col("LESION") == False) & (pl.col("lesion_filled") > 0))
)

In [None]:
# Rule 8: lesion data completeness
dflesion_2.with_columns(
  pl.when(pl.col("type").is_not_null())
  .then(True)
  .otherwise(False)
  .alias("type_filled"),
  pl.when(pl.col("size").is_not_null())
  .then(True)
  .otherwise(False)
  .alias("size_filled"),
  pl.when(pl.any_horizontal(pl.col("A", "B", "C", "D").is_not_null()))
  .then(True)
  .otherwise(False)
  .alias("site_filled"),
).with_columns(
  pl.all_horizontal("type_filled", "size_filled", "site_filled").alias("is_complete")
).filter(pl.col("lesion_filled") == True).filter(
  (pl.col("lesion_filled") == True) & (pl.col("is_complete") == False)
)
