# 0. Setup

In [None]:
import polars as pl
from rich import print as rprint

import logging
import logging.config
import re
from collections import Counter
import json
import h5py
import numpy as np
import polars as pl
from PIL import Image
from pydantic import BaseModel, field_validator
from pathlib import Path
import io
import typing as T
from sklearn import model_selection, dummy as sk_dummy
from sklearn.metrics import roc_auc_score

In [None]:
logger_config_json = """{
    "version": 1,
    "disable_existing_loggers": false,
    "formatters": {
      "rich": {
        "format": "%(message)s",
        "datefmt": "%Y-%m-%dT%H:%M:%S%z"
      }
    },
    "filters": {
      "third_party": {
        "()": "__main__.DependencyFilter",
        "param": 20
      }
    },
    "handlers": {
      "stdout": {
        "class": "rich.logging.RichHandler",
        "formatter": "rich",
        "level": "DEBUG",
        "filters": ["third_party"]
      }
    },
    "loggers": {
      "root": {
        "level": "DEBUG",
        "handlers": [
          "stdout"
        ]
      }
    }
  }

"""

class DependencyFilter(logging.Filter):
    """Filter to only keep third party logrecords above `param`.

    logrecord: https://docs.python.org/3/library/logging.html#logrecord-attributes
    logging levels: https://docs.python.org/3/library/logging.html
    custom level handling: https://docs.python.org/3/howto/logging-cookbook.html#custom-handling-of-levels
    custom filters: https://docs.python.org/3/howto/logging-cookbook.html#configuring-filters-with-dictconfig
    """

    def __init__(self, param: int):
        self.param = param

    def filter(self, record: logging.LogRecord) -> bool:
        is_1st_party = record.name.startswith("tools") or record.name == "__main__"
        is_3rd_party = not is_1st_party
        if is_3rd_party:
            allow = record.levelno >= self.param
            return allow
        else:
            return True


def setup_logging():
    
    config = json.loads(logger_config_json)
    logging.config.dictConfig(config)
    
class Settings(BaseModel):
    path_input:Path = Path("/kaggle/input")
    path_work:Path = Path("/kaggle/working")
    y_col:str = "target"
    id_col:str = "isic_id"
    id_col_patient: str = "patient_id"
        
    @field_validator("path_input","path_work", mode="after")
    def is_valid_path(cls, v: Path) -> Path:
        if not v.exists():
            msg = f"{v} does not seem to exist."
            raise FileNotFoundError(msg)
        return v
    
    @property
    def path_input_isic2024(self) -> Path:
        return self.path_input / "isic-2024-challenge"

In [None]:
setup_logging()
logger = logging.getLogger(__name__)
settings = Settings()

In [None]:
!ls -lh {settings.path_input_isic2024}

# 1. Prepare data

In [None]:
path_data = settings.path_input_isic2024
path_data, path_data.exists()

In [None]:
path_train_img = path_data / "train-image.hdf5"
path_test_img = path_data / "test-image.hdf5"

path_train_meta = path_data / "train-metadata.csv"
path_test_meta = path_data / "test-metadata.csv"

In [None]:
assert path_train_img.exists()
assert path_test_img.exists()
assert path_train_meta.exists()
assert path_test_meta.exists()

In [None]:
train_img = h5py.File(path_train_img, "r")
logger.info(f"train keys {len(train_img.keys())=:_}, 5 example keys{[k for i, k in enumerate(train_img.keys()) if i < 5]}")

In [None]:
test_img = h5py.File(path_test_img, "r")
logger.info(f"test keys {len(test_img.keys())=:_}, 5 example keys{[k for i, k in enumerate(test_img.keys()) if i < 5]}")

In [None]:
def hdf_entry_to_image(hdf: h5py.Dataset, key: str) -> Image.Image:
    bytes_ = io.BytesIO(hdf[key][()])
    return Image.open(bytes_)

In [None]:
# if "ISIC_0024200" in train_img:
#     image = hdf_entry_to_image(train_img, "ISIC_0024200")
#     display(image)

In [None]:
# if "ISIC_0015657" in test_img:
#     image = hdf_entry_to_image(test_img, "ISIC_0015657")
#     display(image)

In [None]:
df_train_meta = pl.read_csv(path_train_meta)
df_train_meta.head()

In [None]:
df_test_meta = pl.read_csv(path_test_meta)
df_test_meta.head()

light data wrangling

In [None]:
def sex_str2bool(val: str) -> bool:
    match val:
        case "male":
            return True
        case "female":
            return False
        case "":
            return None
        case _:
            raise ValueError(f"{val=} was expected to be 'male' or 'female' only.")


def wrangle(df: pl.DataFrame, y_col: str) -> pl.DataFrame:

    expressions = [
        pl.col("anatom_site_general").replace("", "unknown"),
        pl.col("sex").map_elements(sex_str2bool, return_dtype=pl.Boolean),
        pl.col("tbp_lv_symm_2axis_angle").cast(pl.Int32),
    ]

    age_approx_is_str = df["age_approx"].dtype == pl.String
    if age_approx_is_str:
        expressions.append(pl.col("age_approx").replace("NA", None).cast(pl.Float32))
    else:
        # because there are no missing values in the test set it's already float64
        expressions.append(pl.col("age_approx").cast(pl.Float32))


    is_train = y_col in df.columns
    if is_train:
        expressions.append(pl.col(y_col).cast(pl.Boolean))

    df = df.with_columns(*expressions)

    df = df.drop(["image_type"])  # all instances same value

    return df

In [None]:
wrangle(df_test_meta.head(), settings.y_col)

In [None]:
wrangle(df_train_meta.head(), settings.y_col)

In [None]:
df_train_meta = wrangle(df_train_meta, settings.y_col)
df_test_meta = wrangle(df_test_meta, settings.y_col)

In [None]:
path_submission = path_data / "sample_submission.csv"
path_submission.exists()

In [None]:
df_sample_submission = pl.read_csv(path_submission)
df_sample_submission.head()

# 2. Dummy model

In [None]:
x_cols = [c for c in df_train_meta.columns if c not in [settings.y_col, settings.id_col, settings.id_col_patient] and c in df_test_meta.columns]
logger.info(f"feature columns ({len(x_cols)}) {x_cols=}")

In [None]:
def get_Xy(
    df: pl.DataFrame, x_cols: T.Iterable[str], y_col: str
) -> T.Tuple[pl.DataFrame, T.Optional[pl.Series]]:
    X = df[x_cols]
    is_train = y_col in df.columns
    if is_train:
        y = df[y_col]
    else:
        y = None
    return X, y

In [None]:
X, y = get_Xy(df_train_meta, x_cols, settings.y_col)

In [None]:
n_splits = 5
splitter = model_selection.StratifiedGroupKFold(n_splits=n_splits)

In [None]:
splits = splitter.split(X,y,groups=df_train_meta[settings.id_col_patient])

In [None]:
ix0, ix1 = next(iter(splits))

X0 = X[ix0,:]
y0 = y[ix0]
X1 = X[ix1,:]
y1 = y[ix1]

In [None]:
dummy_model = sk_dummy.DummyClassifier(strategy="most_frequent")

In [None]:
dummy_model.fit(X0, y0)

In [None]:
y_pred1 = dummy_model.predict_proba(X1)[:,1]
logger.info(f"Inference for validation set ({len(y_pred1):_} samples), first 21 entries: {y_pred1[:21]}")

In [None]:
def calc_score(y_true: T.Iterable, y_prob: T.Iterable, min_tpr: float = 0.8) -> float:
    v_gt = abs(np.asarray(y_true) - 1)
    v_pred = np.array([1.0 - x for x in y_prob])
    max_fpr = abs(1 - min_tpr)
    partial_auc_scaled = roc_auc_score(v_gt, v_pred, max_fpr=max_fpr)
    # change scale from [0.5, 1.0] to [0.5 * max_fpr**2, max_fpr]
    # https://math.stackexchange.com/questions/914823/shift-numbers-into-a-different-range
    partial_auc = 0.5 * max_fpr**2 + (max_fpr - 0.5 * max_fpr**2) / (1.0 - 0.5) * (
        partial_auc_scaled - 0.5
    )
    return partial_auc

In [None]:
val_score = calc_score(y1, y_pred1)
logger.info(f"{val_score=}")

In [None]:
X_test, _ = get_Xy(df_test_meta, x_cols, settings.y_col)

In [None]:
y_pred_test = dummy_model.predict_proba(X_test)[:, 1]
logger.info(f"Inference for test set ({len(y_pred_test):_} samples), first 21 entries: {y_pred_test[:21]}")

# 3. Submission

In [None]:
def get_df_submission(
    df_test: pl.DataFrame, y_pred_test: np.ndarray, id_col: str, y_col: str
) -> pl.DataFrame:
    df_submission = pl.from_dict({id_col: df_test[id_col], y_col: y_pred_test})
    return df_submission

In [None]:
df_submission = get_df_submission(df_test_meta, y_pred_test, settings.id_col, settings.y_col)
df_submission.head()

In [None]:
path_submission = settings.path_work / "submission.csv"
path_submission.exists()

In [None]:
df_submission.write_csv(path_submission, float_precision=8, separator=",")

In [None]:
!head {path_submission}

In [None]:
!head {path_data}/sample_submission.csv

In [None]:
def create_submit_message(model, val_score:float, n_splits:int, comment:T.Dict[str,T.Any]=None) -> str:
    msg = f"""validation score ({n_splits=}): {val_score=}
    
    model: {model.__str__()}"""
    
    if comment is not None:
        msg = f"""{msg}
        
        comment: {json.dumps(comment,indent=2)}"""
        
    return msg

submit_message = create_submit_message(dummy_model, val_score, n_splits, comment={"splitter": splitter.__str__(), "group_col": settings.id_col_patient})
logger.info(f"{submit_message=}")    

In [None]:
logger.info("Done!")