### **01 - Incidence of MLTC**
#### **01A - Subsegment configuration**


**Imports**

In [1]:
# required imports
from pyspark.sql import functions as F  # noqa: N812

# requires blank line after last import


**Parameter cell**

In [6]:
# parameter cell
incidence_schema = ""  # "mltc_incidence_outputs_v40_20230331"
segmentation_schema = ""  # "obh_segmentation_v40_20230331"
specification_schema = ""  # "specification_v40_20230331"

# optional, can be blank


In [14]:
# Set parameters in Spark configuration with 'param.' prefix (for use in SQL cells)
spark.conf.set("param.incidence_schema", incidence_schema)
spark.conf.set("param.segmentation_schema", segmentation_schema)
spark.conf.set("param.specification_schema", specification_schema)


---
#### **01A - Subsegment configuration**

Some subsegments are excluded from this analysis, so a purpose-specific configuration is applied to restrict to the selected subset of subsegments.

The following section creates two tables, which are copies of `dim_subsegment_combinations` and `breakdown_subsegment_combinations`, restricted to a subset of conditions.

This allows for recalculation of the `condition_count` field for the selected subset, which is then used in subsequent analyses to identify progression from one state of MLTC to the next.


**a - Create config view to select subset of subsegments**

- Use `lookup_subsegments` table to obtain `subsegment_code`, `_id`, `_name` and `_description`
- Apply binary flags for whether each subsegment is included
- Restrict output to only subsegments flagged for inclusion

**Instructions**: Update true/false flags here to update config for the analysis

<blockquote style="color: #D8000C; background-color: #FFD2D2; padding: 10px; border-left: 6px solid #D8000C;">
  <strong>⚠️ Warning:</strong> DROP TABLE is currently commented out, as this table does not need to be recreated each time the incidence analysis is run.
</blockquote>

In [4]:
%%sql

--DROP TABLE IF EXISTS ${param.incidence_schema}.config_subsegments

In [16]:
%%sql

--CREATE    TABLE     ${param.incidence_schema}.config_subsegments USING PARQUET AS
CREATE OR REPLACE TEMPORARY VIEW config_subsegments AS
          SELECT    ls.subsegment_code,
                    ls.subsegment_id,
                    ls.subsegment_name,
                    ls.subsegment_description,
                    cs.is_included_in_config
          FROM     
          VALUES    ('alcohol_dependence', TRUE),
                    ('asthma', TRUE),
                    ('atrial_fibrillation', TRUE),
                    ('bronchiectasis', TRUE),
                    ('cancer', TRUE),
                    ('cerebrovascular_disease', TRUE),
                    ('chronic_kidney_disease', TRUE),
                    ('chronic_liver_disease', TRUE),
                    ('chronic_pain', FALSE), -- excluded due to RA overlap
                    ('copd', TRUE),
                    ('coronary_heart_disease', TRUE),
                    ('cystic_fibrosis', FALSE), -- excluded as present from birth
                    ('depression', TRUE),
                    ('diabetes', TRUE),
                    ('epilepsy', TRUE),
                    ('heart_failure', TRUE),
                    ('hypertension', TRUE),
                    ('inflammatory_bowel_disease', TRUE),
                    ('multiple_sclerosis', TRUE),
                    ('osteoarthritis', TRUE),
                    ('osteoporosis', TRUE),
                    ('parkinsons_disease', TRUE),
                    ('peripheral_vascular_disease', TRUE),
                    ('pulmonary_heart_disease', TRUE),
                    ('rheumatoid_arthritis', TRUE),
                    ('sarcoidosis', TRUE),
                    ('serious_mental_illness', TRUE),
                    ('sickle_cell_disease', FALSE), -- excluded as present from birth
                    ('learning_disability', FALSE), -- excluded as present from birth
                    ('physical_disability', TRUE),
                    ('dementia', TRUE),
                    ('intermediate_frailty_risk_hfrs', FALSE), -- excluded due to resolves
                    ('high_frailty_risk_hfrs', FALSE), -- excluded due to resolves
                    ('end_stage_renal_failure', FALSE), -- excluded due to LTC overlap
                    ('severe_interstitial_lung_disease', TRUE),
                    ('liver_failure', FALSE), -- excluded due to LTC overlap
                    ('neurological_organ_failure', FALSE), -- excluded due to LTC overlap
                    ('severe_copd', FALSE), -- excluded due to LTC overlap
                    ('severe_heart_failure', FALSE), -- excluded due to LTC overlap
                    ('incurable_cancer', FALSE) -- excluded due to LTC overlap
                    AS cs (subsegment, is_included_in_config)         
          INNER     JOIN ${param.specification_schema}.lookup_subsegments ls 
                    -- case insensitive match ↓
                    ON UPPER(cs.subsegment) = UPPER(ls.subsegment_name) 
                    WHERE     cs.is_included_in_config IS TRUE


**b - Create dataframes**

This section creates dataframes from the existing versions of `dim_subsegment_combinations` and `breakdown_subsegment_combinations`.

These will be used for the creation of adapted versions of `dim_subsegment_combinations` and `breakdown_subsegment_combinations` below.

In [6]:
df_config_subsegments = spark.table("${param.incidence_schema}.config_subsegments")

df_dim_subsegment_combinations = spark.table(
    "${param.segmentation_schema}.dim_subsegment_combinations",
)

df_breakdown_subsegment_combinations = spark.table(
    "${param.segmentation_schema}.breakdown_subsegment_combinations",
)

**c - Create adapted version of `dim_subsegment_combinations`**

Mapping the config to `dim_subsegment_combinations` is slightly complicated due to the pivoted nature of the table (one column per subsegment).

- The following logic effectively checks for where `dim_subsegment_combinations` subsegment column names match subsegments within the config
- Subsegment columns for subsegments not in the config are ignored
- Once the relevant subsegment columns have been selected they are populated with the corresponding subsegment code
- These are then concatenated together to create an updated (`new_`) combination code
- Subsegment names are also concatenated to create an updated (`new_`) combination name
- Finally subsegment columns are converted back to binary flags


<blockquote style="color: #D8000C; background-color: #FFD2D2; padding: 10px; border-left: 6px solid #D8000C;">
  <strong>⚠️ Warning:</strong> condition_count logic may need to be updated if config changes (for definitional overlaps)
</blockquote>


In [7]:
%%sql

CREATE SCHEMA IF NOT EXISTS ${param.incidence_schema}

In [8]:
# Step 1: get subsegment_code and subsegment_name pairs from config
## Lowercase subsegment_name for matching to dim_subsegment_combination later
subsegment_name_to_code = {
    row["subsegment_name"]: row["subsegment_code"]
    for row in df_config_subsegments.select(
        F.lower(F.col("subsegment_name")).alias("subsegment_name"),
        "subsegment_code",
    ).collect()
}

# Step 2: Identify columns to select from dim_subsegment_combinations
## Existing subsegment combination columns will be renamed with old_ prefix
## Otherwise select subsegment columns where those subsegments appear in the config
## Where each subsegment column is true, the value is replaced with the subsegment_code
## This is to enable concatenation of codes for the creation of the new
## subsegment_combination_code later
## Other columns are ignored
columns_to_select = []
additional_columns_with_rename = [
    F.col("subsegment_combination_id").alias("old_subsegment_combination_id"),
    F.col("subsegment_combination_code").alias("old_subsegment_combination_code"),
    F.col("subsegment_combination_name").alias("old_subsegment_combination_name"),
]

for col_name in df_dim_subsegment_combinations.columns:
    if col_name.lower() in subsegment_name_to_code:
        column = (
            F.when(F.col(col_name), subsegment_name_to_code[col_name.lower()])
            .otherwise(None)
            .alias(col_name)
        )
        columns_to_select.append(column)
    elif col_name in [
        "subsegment_combination_id",
        "subsegment_combination_code",
        "subsegment_combination_name",
    ]:
        continue

# Step 3: Combine final columns for selection
final_columns_to_select = additional_columns_with_rename + columns_to_select

# Step 4: Select final columns
df_filtered = df_dim_subsegment_combinations.select(*final_columns_to_select)

# Step 5: Concatenate all subsegment codes into a new combination code
subsegment_columns = [
    col_name
    for col_name in df_filtered.columns
    if col_name.lower() in subsegment_name_to_code
]
df_filtered = df_filtered.withColumn(
    "new_subsegment_combination_code",
    F.concat_ws("", *subsegment_columns),
)

# Step 6: Map lowercased subsegment names back to original case-sensitive versions
name_mapping = {
    row["subsegment_name"].lower(): row["subsegment_name"]
    for row in df_config_subsegments.collect()
}

# Step 7: Concatenate case-sensitive subsegment names into a new combination name
concat_names_expr = F.concat_ws(
    " ",
    *[
        F.when(F.col(col_name).isNotNull(), F.lit(name_mapping[col_name.lower()]))
        for col_name in subsegment_columns
    ],
)
df_filtered = df_filtered.withColumn(
    "new_subsegment_combination_name",
    concat_names_expr,
)

# Step 8: Count non-NULL subsegments across each row (new condition count)
## Note - does not deal with LTC/EOL overlaps as not required for current config
count_non_null_expr = sum(
    [
        F.when(F.col(col_name).isNotNull(), 1).otherwise(0)
        for col_name in subsegment_columns
    ],
)
df_filtered = df_filtered.withColumn("new_condition_count", count_non_null_expr)

# Step 9: Revert subsegment columns back to binary flags
for col_name in df_filtered.columns:
    if col_name.lower() in subsegment_name_to_code:
        df_filtered = df_filtered.withColumn(col_name, F.col(col_name).isNotNull())

# Step 10: Write as table
df_filtered.write.mode("overwrite").saveAsTable(
    f"{incidence_schema}.dim_subsegment_combinations_config",
)


**d - Create adapted version of `breakdown_subsegment_combinations`**

This step creates the `breakdown_subsegment_combinations` table, which contains one row per subsegment (i.e. unpivoted), for each subsegment in the combination (rather than one column), from the selected subset of subsegments.
- This step unpivots the dataframe created in the previous step
- By using this dataframe, rows in the output table are already restricted to subsegments that exist in the config
- The new `subsegment_combination_code` and `subsegment_combination_name` from the step above are also applied

In [9]:
# Step 1: Create a copy of df_filtered with unpivoted 'subsegment_name_lower' column
## Identify list of subsegment columns by their data type (all boolean flags)
boolean_columns = [col for col, dtype in df_filtered.dtypes if dtype == "boolean"]

df_long_format = None

# For each subsegment
for col_name in boolean_columns:
    # Filter rows where the subsegment column is True and select necessary columns
    # Add subsegment column name as value within new subsegment_name_lower column
    df_temp = df_filtered.filter(F.col(col_name)).select(
        "old_subsegment_combination_id",
        "old_subsegment_combination_code",
        "old_subsegment_combination_name",
        "new_subsegment_combination_code",
        "new_subsegment_combination_name",
        "new_condition_count",
        F.lit(col_name).alias("subsegment_name_lower"),
    )

    # Union the temporary dataframe with the accumulating dataframe
    df_long_format = (
        df_temp if df_long_format is None else df_long_format.union(df_temp)
    )

# Step 2: Perform case-insensitive join back to config
df_joined = df_long_format.join(
    df_config_subsegments,
    F.lower(df_long_format["subsegment_name_lower"])
    == F.lower(df_config_subsegments["subsegment_name"]),
    "inner",
)

# Step 3: Select specified columns
df_breakdown_subsegment_combinations = df_joined.select(
    "old_subsegment_combination_id",
    "old_subsegment_combination_code",
    "old_subsegment_combination_name",
    "new_subsegment_combination_code",
    "new_subsegment_combination_name",
    "new_condition_count",
    "subsegment_code",
    "subsegment_id",
    "subsegment_name",
    "subsegment_description",
)

# Step 4: Write as table
df_breakdown_subsegment_combinations.write.mode("overwrite").saveAsTable(
    f"{incidence_schema}.breakdown_subsegment_combinations_config",
)
