# Process the SOA's VBT file in Pandas

### Setup

In [77]:
# coding: utf-8
import os
import copy
import random
from collections.abc import Iterable
from typing import Any

import pandas as pd
import numpy as np
import altair as alt

alt.data_transformers.disable_max_rows()

DataTransformerRegistry.enable('default')

In [25]:
def coalesce(x: Iterable[Any], y: Iterable[Any], debug=None) -> Any:
    if len(x) > 0:
        return x[0]
    elif len(y) > 0:
        return y[0]
    else:
        print("no valid value")
        if debug is not None:
            print(debug)
            print("x:", x)
            print("y:", y)

## Load the unismoke file

In [32]:
genders = ["Male", "Female"]
genders_abbr = ["M", "F"]
# sheet names are a bit wonky
# they have 2015 in the ANB ones but not the ALB ones
# so make the names in two lists
unismoke_sheet_names = [f"2015 {gender} Unismoke ANB" for gender in genders] + [f"{gender} Unismoke ALB" for gender in genders]
# also make a set of tules that have (gender, smoking, age_method)
unismoke_details = [(gender, "PRE", "ANB") for  gender in genders_abbr] + [(gender, "PRE", "ALB") for gender in genders_abbr]
print(unismoke_details)

unismoke_sheets = pd.read_excel(
    "2015-vbt-unismoke-alb-anb.xlsx", sheet_name=unismoke_sheet_names, skiprows=2
)
unismoke_sheets

[('M', 'PRE', 'ANB'), ('F', 'PRE', 'ANB'), ('M', 'PRE', 'ALB'), ('F', 'PRE', 'ALB')]


{'2015 Male Unismoke ANB':     Iss. Age       1       2       3       4       5       6       7       8  \
 0          0    0.24    0.14    0.12    0.10    0.09    0.09    0.08    0.08   
 1          1    0.14    0.12    0.10    0.09    0.09    0.08    0.08    0.08   
 2          2    0.12    0.10    0.09    0.09    0.08    0.08    0.08    0.08   
 3          3    0.10    0.09    0.09    0.08    0.08    0.08    0.08    0.08   
 4          4    0.09    0.09    0.08    0.08    0.08    0.08    0.08    0.08   
 ..       ...     ...     ...     ...     ...     ...     ...     ...     ...   
 91        91   28.92   65.71  138.95  199.45  212.01  228.45  246.02  265.18   
 92        92   39.87   98.28  199.45  212.01  228.45  246.02  265.18  285.54   
 93        93   57.61  141.27  212.01  228.45  246.02  265.18  285.54  306.70   
 94        94   83.08  164.44  228.45  246.02  265.18  285.54  306.70  328.25   
 95        95  116.87  228.45  246.02  265.18  285.54  306.70  328.25  350.02   
 


## Load the smoker distinct file

In [33]:
age_methods = ["ALB", "ANB"]
smoking_statuses = ["SM", "NS"]
smoker_distinct_sheet_names = [
    f"2015 {gender}{smoking} {age_method}" 
    for gender in genders_abbr 
    for smoking in smoking_statuses 
    for age_method in age_methods
] 
print(smoker_distinct_sheet_names)
smoker_distinct_details = [
    (gender, smoking, age_method)
    for gender in genders_abbr 
    for smoking in smoking_statuses 
    for age_method in age_methods
]
print(smoker_distinct_details)

smoker_distinct_sheets = pd.read_excel(
    "2015-vbt-smoker-distinct-alb-anb.xlsx",sheet_name=smoker_distinct_sheet_names, skiprows=2
)
smoker_distinct_sheets

['2015 MSM ALB', '2015 MSM ANB', '2015 MNS ALB', '2015 MNS ANB', '2015 FSM ALB', '2015 FSM ANB', '2015 FNS ALB', '2015 FNS ANB']
[('M', 'SM', 'ALB'), ('M', 'SM', 'ANB'), ('M', 'NS', 'ALB'), ('M', 'NS', 'ANB'), ('F', 'SM', 'ALB'), ('F', 'SM', 'ANB'), ('F', 'NS', 'ALB'), ('F', 'NS', 'ANB')]


{'2015 MSM ALB':     Iss. Age       1       2       3       4       5       6       7       8  \
 0         18    0.67    0.70    0.72    0.75    0.78    0.80    0.83    0.85   
 1         19    0.64    0.66    0.69    0.71    0.73    0.76    0.78    0.78   
 2         20    0.60    0.63    0.65    0.67    0.69    0.71    0.73    0.73   
 3         21    0.56    0.58    0.60    0.62    0.64    0.67    0.68    0.68   
 4         22    0.52    0.53    0.54    0.58    0.61    0.63    0.64    0.65   
 ..       ...     ...     ...     ...     ...     ...     ...     ...     ...   
 73        91   58.87   95.41  177.41  214.85  226.52  241.13  257.32  275.30   
 74        92   70.75  132.60  214.85  226.52  241.13  257.32  275.30  294.79   
 75        93   87.36  161.79  226.52  241.13  257.32  275.30  294.79  315.52   
 76        94  108.95  199.78  241.13  257.32  275.30  294.79  315.52  336.99   
 77        95  137.03  241.13  257.32  275.30  294.79  315.52  336.99  358.54   
 
          

## Combine the two files

In [34]:
all_details = unismoke_details + smoker_distinct_details
all_sheets = [unismoke_sheets[key] for key in unismoke_sheet_names] + [smoker_distinct_sheets[key] for key in smoker_distinct_sheet_names]
print(len(all_details))
print(len(all_sheets))

12
12


## Flatten them and combine to a single dataframe

In [116]:
dfs = []
for details, df in zip(all_details, all_sheets):
    gender, smoker, age_method = details

    # drop the last column
    df_noage = copy.deepcopy(df.iloc[:, : (df.shape[1] - 1)])
    df_noage.columns = ["issue_age"] + list(range(1, 26)) + ["ult"]
    df_long = df_noage.melt(
        id_vars=["issue_age"], var_name="duration", value_name="q"
    )
    df_long["q"] = df_long["q"] / 1000

    ultimate = copy.deepcopy(
        df_long.loc[df_long.duration == "ult", ["issue_age", "q"]]
    )
    ultimate.columns = ["age", "q"]
    ultimate.age = ultimate.age + 26 - 1

    df_long_noult = copy.deepcopy(df_long.loc[df_long.duration != "ult", :])
    df_long_noult["duration"] = df_long_noult.duration.astype("int")

    # this is super-inefficient but...who cares
    # only run it once!
    expanded = pd.DataFrame(
        [
            {
                "gender": gender,
                "smoker": smoker,
                "age_method": age_method,
                "issue_age": issue_age,
                "duration": duration,
                "q": coalesce(
                    df_long_noult.loc[
                        (df_long_noult.issue_age == issue_age)
                        & (df_long_noult.duration == duration),
                        "q",
                    ].values,
                    ultimate.loc[
                        ultimate.age == (issue_age + duration - 1), "q"
                    ].values,
                    debug="issue_age=="
                    + str(issue_age)
                    + ",duration=="
                    + str(duration)
                    + ",attained_age=="
                    + str(issue_age + duration - 1),
                ),
            }
            for issue_age in range(int(df_long_noult.issue_age.min()), 96)
            for duration in range(1, 121)
            if (issue_age + duration - 1) <= 120
        ]
    )
    dfs.append(expanded)
all = pd.concat(dfs)
all.head()

Unnamed: 0,gender,smoker,age_method,issue_age,duration,q
0,M,PRE,ANB,0,1,0.00024
1,M,PRE,ANB,0,2,0.00014
2,M,PRE,ANB,0,3,0.00012
3,M,PRE,ANB,0,4,0.0001
4,M,PRE,ANB,0,5,9e-05


In [36]:
all.shape

(68468, 6)

In [37]:
all.groupby(['gender', 'smoker', 'age_method']).size()

gender  smoker  age_method
F       NS      ALB           5031
                ANB           5031
        PRE     ALB           7055
                ANB           7055
        SM      ALB           5031
                ANB           5031
M       NS      ALB           5031
                ANB           5031
        PRE     ALB           7055
                ANB           7055
        SM      ALB           5031
                ANB           5031
dtype: int64

## Save to parquet

In [39]:
all.to_parquet("vbt.parquet")

## Tests

In [67]:
EXPECTED_FLOAT_INTEGER_COLUMN_RANGES = {
    "issue_age": [0, 99],
    "duration": [1, 121],
    "factor": [0.25, 4],
    "wearoff": [0, 1],
    "q": [0, 1],
}
EXPECTED_NOMINAL_COLUMN_VALUES = {
    "smoker": {"SM", "NS", "PRE"},
    "gender": {"M", "F"},
    "number_of_classes": {1, 2, 3, 5},
    "uw_class": {
        "PRE",
        "SMK",
        "NSM",
        "SMK3",
        "STND1",
        "STND2",
        "PREF",
        "TOB",
        "SPT",
        "NONT",
        "SPNT",
        "UPNT",
    },
}
RANGE_COLUMNS = {"issue_age", "duration", "face_amount"}
COLUMN_DTYPES = {
    "issue_age": "int",
    "duration": "int",
    "uw_class": "varchar(5)",
    "factor": "float",
    "wearoff": "float",
    "q": "float",
    "smoker": "varchar(3)",
    "gender": "varchar(1)",
    "number_of_classes": "int",
    "face_amount": "numeric(15, 2)",
    "attained_age": "int",
    "age_method": "varchar(3)",
}
# expand to also include the ranges:
COLUMN_DTYPES_RANGE_LOWER = {
    f"{col}_lower": COLUMN_DTYPES[col] for col in RANGE_COLUMNS
}
COLUMN_DTYPES_RANGE_UPPER = {
    f"{col}_upper": COLUMN_DTYPES[col] for col in RANGE_COLUMNS
}
COLUMN_DTYPES_EXPANDED = (
    COLUMN_DTYPES | COLUMN_DTYPES_RANGE_LOWER | COLUMN_DTYPES_RANGE_UPPER
)
COLUMNS_BY_ASSUMPTION_TYPE = {
    "Base": {
        "required": {"gender", "smoker", "issue_age", "duration", "q"},
        "optional": {"number_of_classes", "age_method"},
    },
    "UWClassFactor": {
        "required": {"gender", "uw_class", "factor"},
        "optional": {"number_of_classes", "band", "smoker", "ratio"},
    },
    # e.g. 2015 VBT Preferred Wearoff Factors (Appendix F)
    # from https://www.soa.org/resources/experience-studies/2015/2015-valuation-basic-tables/
    "PreferredWearoff": {
        "required": {"issue_age", "duration", "wearoff"},
        "optional": {"attained_age"},
    },
}


def get_status_mark(truthy: bool) -> str:
    if truthy:
        # return random.choice(('✅', '🎉', '😀', '🥳', '😸', '🌈', '🥇', '🎊', '💯', '💪', '🙌'))
        return '✅'
    return '❌'


def test_nominal_column_values(d: pd.DataFrame) -> None:
    for col, values in EXPECTED_NOMINAL_COLUMN_VALUES.items():
        if col in d.columns:
            status = get_status_mark(set(d.loc[:, col].unique()).issubset(values))
            print(
                f"* {status}: `{col}` nominal values `{set(d.loc[:, col].unique())}` are in expected nominal `{values=}`"
            )


def test_float_integer_column_ranges(d: pd.DataFrame) -> None:
    for col, values in EXPECTED_FLOAT_INTEGER_COLUMN_RANGES.items():
        if col in d.columns:
            status1 = get_status_mark(d.loc[:, col].min() >= values[0])
            status2 = get_status_mark(d.loc[:, col].max() <= values[1])
            print(f"* {status1}/{status2}: `{col}` has values within expected lower/upper range: `{values=}`")
            print(
                f"    * `{col}` has values spanning [`{d.loc[:, col].min()}`, `{d.loc[:, col].max()}`]"
            )


def test_range_column_overlaps(d: pd.DataFrame) -> None:
    """Also test that there are no gaps."""
    for col in RANGE_COLUMNS:
        if f"{col}_lower" in d.columns:
            status = get_status_mark(f"{col}_upper" in d.columns)
            print(f"* {status}: found `{col}_lower`, checked that we have `{col}_upper`")
        if f"{col}_upper" in d.columns:
            status = get_status_mark(f"{col}_lower" in d.columns)
            print(f"* {status}: found `{col}_upper`, checked that we have `{col}_lower`")
        if (f"{col}_upper" in d.columns) and (f"{col}_lower" in d.columns):
            lowers = d.sort_values([f"{col}_lower"]).loc[:, f"{col}_lower"]
            uppers = d.sort_values([f"{col}_lower"]).loc[:, f"{col}_upper"]
            status = get_status_mark(
                all(
                    [
                        (lowers[i] == uppers[i - 1]) and (lowers[i] < uppers[i])
                        for i in range(1, d.shape[0])
                    ]
                )
            )
            print(f"* {status}: `{col}_lower`, `{col}_upper` cover their range")


def test_mortality_curve_and_check_increasing(d: pd.DataFrame) -> None:
    '''pick an issue age (18), make sure mortality (mostly) increases'''
    # print(d.columns)
    sorted = d.sort_values(["duration"]).copy()
    if 'number_of_classes' in sorted.columns:
        sorted = sorted.loc[sorted.number_of_classes == 5, :]
    if 'age_method' in sorted.columns:
        sorted = sorted.loc[sorted.age_method == "ANB", :]
    single_curve = sorted.loc[
        (sorted["issue_age"] == 18)
        & (sorted["gender"] == "M")
        & (sorted["smoker"] == "PRE"),
        "q",
    ].values
    # print(single_curve)
    # print(( single_curve[1:] - single_curve[:-1] ))
    # print(( single_curve[1:] - single_curve[:-1] ).min())
    # one line
    status = get_status_mark((single_curve[1:] - single_curve[:-1]).min() >= -0.002)
    print(f"* {status}: issue age 18 mortality strictly increases across duration (within a tolerance of 2 deaths / 1000)")
    # loop for the above:
    # for i in range(1, single_curve.shape[0]):
    #     assert single_curve.iloc[i] >= single_curve.iloc[i-1]
    if not (np.abs((single_curve[1:] / single_curve[:-1]) - 1) <= 0.165).all():
        for duration, diff in zip(range(len(single_curve[1:])), np.abs((single_curve[1:] / single_curve[:-1]) - 1)):
            print(duration, diff)
    status = get_status_mark((np.abs((single_curve[1:] / single_curve[:-1]) - 1) <= 0.165).all())
    print(
        f"* {status}: issue age 18 mortality has no more than a 16.5% difference YoY"
    )


def test_m_f_and_sm_ns_agg_mortality_conditionals(d: pd.DataFrame) -> None:
    if "gender" in d.columns:
        status = get_status_mark(d.loc[d.gender == "M", "q"].mean() > d.loc[d.gender == "F", "q"].mean())
        print(f"* {status}: mortality for M is greater than for F")
    if "smoker" in d.columns:
        smoker_mean = d.loc[(d.smoker == "SM") & (d.issue_age >= 18), "q"].mean()
        presmoker_mean = d.loc[(d.smoker == "PRE") & (d.issue_age >= 18), "q"].mean()
        nonsmoker_mean = d.loc[(d.smoker == "NS") & (d.issue_age >= 18), "q"].mean()
        # print(f"{smoker_mean=}")
        # print(f"{presmoker_mean=}")
        # print(f"{nonsmoker_mean=}")
        status1 = smoker_mean > presmoker_mean
        status2 = presmoker_mean > nonsmoker_mean
        status = get_status_mark(status1 and status2)
        print(f"* {status}: mortality for SM/NS/PRE is SM > PRE > NS")


def test_types_of_columns_for_given_table_and_data_types(
    d: pd.DataFrame, table_type: str = None
) -> None:
    if table_type in COLUMNS_BY_ASSUMPTION_TYPE:
        assumption_type_columns = COLUMNS_BY_ASSUMPTION_TYPE[table_type]
        d_colset = set(d.columns)
        status = get_status_mark(assumption_type_columns["required"].issubset(d_colset))
        print(f'* {status}: table has required columns `{assumption_type_columns["required"]}`')
        status = get_status_mark(            
            d_colset.issubset(
                assumption_type_columns["required"] | assumption_type_columns["optional"]
            )
        )
        print(f'* {status}: non-required tables columns are from optional set `{assumption_type_columns["optional"]}`')
    for col, dtype in COLUMN_DTYPES_EXPANDED.items():
        if col in d.columns:
            if "varchar" in dtype:
                status = get_status_mark(str(d[col].dtype) in {"object"})
            elif "int" in dtype:
                status = get_status_mark(str(d[col].dtype) in {"int64"})
            # floats and numerics:
            else:
                status = get_status_mark(str(d[col].dtype) in {"int64", "float64"})
                print(f"* {status}: column `{col}` dtype `{str(d[col].dtype)}` is as expected `{dtype}`")

    for col in d.columns:
        if col not in COLUMN_DTYPES_EXPANDED:
            print(f"* ⚠️: column `{col}` does not have a specified dtype")


def test_all_column_stuff(d: pd.DataFrame, **kwargs):
    test_nominal_column_values(d)
    test_float_integer_column_ranges(d)
    test_range_column_overlaps(d)
    test_types_of_columns_for_given_table_and_data_types(d, **kwargs)

In [68]:
test_all_column_stuff(all, table_type="Base")

* ✅: `smoker` nominal values `{'SM', 'PRE', 'NS'}` are in expected nominal `values={'SM', 'PRE', 'NS'}`
* ✅: `gender` nominal values `{'F', 'M'}` are in expected nominal `values={'F', 'M'}`
* ✅/✅: `issue_age` has values within expected lower/upper range: `values=[0, 99]`
    * `issue_age` has values spanning [`0`, `95`]
* ✅/✅: `duration` has values within expected lower/upper range: `values=[1, 121]`
    * `duration` has values spanning [`1`, `120`]
* ✅/✅: `q` has values within expected lower/upper range: `values=[0, 1]`
    * `q` has values spanning [`5.9999999999999995e-05`, `0.5`]
* ✅: table has required columns `{'gender', 'smoker', 'issue_age', 'duration', 'q'}`
* ✅: non-required tables columns are from optional set `{'age_method', 'number_of_classes'}`
* ✅: column `q` dtype `float64` is as expected `float`


In [69]:
test_mortality_curve_and_check_increasing(all)

* ✅: issue age 18 mortality strictly increases across duration (within a tolerance of 2 deaths / 1000)
* ✅: issue age 18 mortality has no more than a 16.5% difference YoY


### There should be no jumps in these curves lines (inspect)

In [124]:
all_charts = []
for gender, smoker, age_method in all_details:
    sorted = all.sort_values(["duration"]).copy()
    if 'number_of_classes' in sorted.columns:
        sorted = sorted.loc[sorted.number_of_classes == 5, :]
    if 'age_method' in sorted.columns:
        sorted = sorted.loc[sorted.age_method == age_method, :]
    curves = sorted.loc[
        (sorted["issue_age"].isin(np.arange(0, 100, 5)))
        & (sorted["gender"] == gender)
        & (sorted["smoker"] == smoker),
        :
    ].assign(
        attained_age=lambda x: x.issue_age + x.duration - 1
    )
    chart = alt.Chart(curves).mark_line().encode(
        alt.X("attained_age:O",
              axis=alt.Axis(
                  labelAngle=0,  # Angle the labels for better readability
                  labelOverlap=False,  # Prevent label overlap
                  values=list(range(0, max(curves['attained_age']), 5)),
                  grid=False,
              ),
              # scale=alt.Scale(domain=[0, 120]),
              title="Attained Age"),
        alt.Y("q:Q", 
              scale=alt.Scale(type='log', base=10, domain=[.00001, 1]),
              axis=alt.Axis(
                  grid=True,  # Keep horizontal gridlines
                  gridOpacity=0.3,  # Make gridlines lighter
              ),
              title="Mortality Rate (q)"),  # More descriptive y-axis title
        alt.Color("issue_age:Q",
                  scale=alt.Scale(scheme='viridis'),  # Alternative color scheme
                  legend=alt.Legend(title="Issue Age"))
    ).properties(
        width=900,
        height=500,
        title=f"{gender} {smoker} {age_method}"
    )
    all_charts.append(chart)
alt.vconcat(*all_charts).properties(
     title="Mortality Rates by Issue Age and Attained Age"
)    

In [60]:
test_m_f_and_sm_ns_agg_mortality_conditionals(all)

* ✅: mortality for M is greater than for F
* ✅: mortality for SM/NS/PRE is SM > PRE > NS
