# New Hospitals Model

This notebook runs the NHP model and produces the aggregated results.

In [1]:
params_file = "test/queue/RL4.json"

## Setup

Load the required packages

In [2]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np

import os
import shutil
import json
from datetime import datetime

from model.aae import AaEModel
from model.inpatients import InpatientsModel
from model.outpatients import OutpatientsModel

We need to create a folder ready to store the results.

In [3]:
with open(params_file, "r", encoding="UTF-8") as pf:
  params = json.load(pf)

dataset = params["input_data"]
scenario = params["name"]
model_runs = params["model_runs"]

run_time = datetime.now().strftime("%Y%m%d_%H%M%S")

results_path = f"data/{dataset}/results/{scenario}/{run_time}"

os.makedirs(results_path)
shutil.copy(params_file, f"{results_path}/params.json")

'data/RL4/results/test/20220401_115354/params.json'

We will run the model in parallel. By default, use all available CPU cores. You can set this to a lower value to use less resources, but it will take longer to run the model.

In [4]:
cpus = os.cpu_count()
cpus

12

When we run the model in parallel it's slightly more efficient to run a batch of model runs. Batches of 4 or 8 seems to be most efficient. This value should be a power of 2.

In [5]:
batch_size = 2 ** 2
batch_size

4

## Run the model

In [6]:
(AaEModel(results_path)
# add 1 because of the principal run
  .multi_model_runs(0, model_runs + 1, cpus, batch_size)
)

1028it [01:21, 12.66it/s]                          

Model runs completed: 1025 / 1025





In [7]:
(InpatientsModel(results_path)
  .multi_model_runs(0, model_runs + 1, cpus, batch_size)
)

1028it [07:24,  2.31it/s]                          

Model runs completed: 1025 / 1025





In [8]:
(OutpatientsModel(results_path)
  .multi_model_runs(0, model_runs + 1, cpus, batch_size)
)

1028it [04:53,  3.50it/s]                          

Model runs completed: 1025 / 1025





## Aggregate Results

### Load Baseline Data

We want to aggregate ages into groups, this function helps us do that.

In [9]:
def age_groups(x):
  return pd.cut(
    x,
    [0, 5, 15, 35, 50, 65, 85, 1000],
    False,
    ["0-4", "5-14", "15-34", "35-49", "50-64", "65-84", "85+"]
  ).astype(str)

This function makes it easier to load the datasets.

In [10]:
def load_dataset(type, *args):
  cols = ["rn", "age", "sex", "ethnos", "imd04_decile"] + list(args)[0]
  
  df = (pq
    .read_pandas(f"data/{dataset}/{type}.parquet", cols)
    .to_pandas()
    .set_index("rn")
  )
  df["age_group"] = age_groups(df["age"])
  return df

We can now load all of our baseline data. We show the number of rows loaded from that dataset, and the amount of activity in that dataset.

In [11]:
aae = load_dataset("aae", [
  "aedepttype", "aearrivalmode", "arrivals"
])
(len(aae.index), sum(aae.arrivals))

(44057, 177532)

In [12]:
ip = load_dataset("ip", [
  "tretspef", "classpat", "admidate", "disdate", "speldur", "epitype", "admimeth", "dismeth"
])
(len(ip.index), len(ip.index)) # inpatients is row level

(132403, 132403)

In [13]:
op = load_dataset("op", [
  "tretspef", "is_first", "has_procedures", "attendances", "tele_attendances"
])
(len(op.index), sum(op.attendances) + sum(op.tele_attendances))

(236972, 936404)

### Load model results

Our results are stored in a datalake structure, use `pyarrow` to load the individual files into a single dataset.

In [14]:
partitioning = pa.dataset.HivePartitioning(pa.schema([("model_run", pa.int32())]))
def load_model_results(type):
  ds = pq.ParquetDataset(
    f"{results_path}/{type}/",
    partitioning = partitioning,
    use_legacy_dataset = False
  )
  df = ds.read_pandas().to_pandas()
  return df.set_index("rn")

#### Process IP data

In [15]:
ip_mr = load_model_results("ip")


Rows that have a classpat of -1 need to be moved into OP.

In [16]:
ip_op_row_ix = ip_mr["classpat"] == "-1"

ip_op_rows = (ip
  .merge(ip_mr[ip_op_row_ix][["model_run"]], left_index = True, right_index = True)
  .value_counts(["age_group", "sex", "imd04_decile", "ethnos", "tretspef", "model_run"])
  .to_frame("attendances")
  .reset_index()
)
ip_op_rows["is_first"] = False
ip_op_rows["has_procedures"] = True
ip_op_rows["tele_attendances"] = 0

Remove the ip to op rows.

In [17]:
ip_mr = (ip
  .drop(["classpat", "speldur"], axis = "columns")
  .merge(ip_mr[~ip_op_row_ix], left_index = True, right_index = True)
)

Join the results and the baseline data together

In [18]:
ip["type"] = "baseline"
ip_mr["type"] = "model"
ip["model_run"] = 0
ip = pd.concat([ip, ip_mr])
# free up memory
ip_mr = None
# create an admission group column
ip["admission_group"] = "non-elective"
ip.loc[ip["admimeth"].str.startswith("1"), "admission_group"] = "elective"
# quick dq fix: convert any "non-elective" daycases to "elective"
ip.loc[ip["classpat"].isin(["2", "3"]), "admission_group"] = "elective"
# create a "pod" column, starting with the admission group
ip["pod"] = ip["admission_group"]
ip.loc[ip["classpat"].isin(["1", "4"]), "pod"] += "_admission"
ip.loc[ip["classpat"].isin(["2", "3"]), "pod"] += "_daycase"
ip.loc[ip["classpat"] == "5", "pod"] += "_birth-episode"
ip["beddays"] = ip["speldur"] + 1

We can now aggregate our results

In [19]:
ip_agg = (ip
  .groupby(["age_group", "sex", "tretspef", "type", "model_run", "pod"], as_index = False)
  .agg({ "speldur": len, "beddays": np.sum })
).rename({ "speldur": "admissions" }, axis = "columns")

#### Process OP data

In [20]:
# make sure to convert imd04_decile to a string
#  - groupby with categorical columns can cause issues
op["imd04_decile"] = op["imd04_decile"].astype(str)

In [21]:
op_mr = (op[["age_group", "sex", "imd04_decile", "ethnos", "tretspef", "is_first", "has_procedures"]]
  .merge(load_model_results("op"), left_index = True, right_index = True)
).reset_index(drop = True)

Union the OP model results with the IP rows that were converted to outpatients.

In [22]:
op_mr = pd.concat([op_mr, ip_op_rows])

Join the results and the baseline data together

In [23]:
op["type"] = "baseline"
op_mr["type"] = "model"
op["model_run"] = 0
op = (pd.concat([op[op_mr.columns.tolist()].reset_index(drop = True), op_mr])
  .groupby([
    "age_group", "sex", "imd04_decile", "ethnos", "tretspef",
    "is_first", "has_procedures", "type", "model_run"
  ], as_index = False)
  .agg(sum)
)
op.loc[ op["is_first"], "pod"] = "op_first"
op.loc[~op["is_first"], "pod"] = "op_follow-up"
op.loc[op["has_procedures"], "pod"] = "op_procedure"
# repromote imd04_decile to categorial, make sure to use ip's categories
op["imd04_decile"] = pd.Categorical(
  op["imd04_decile"].astype("category"),
  ip["imd04_decile"].cat.categories
)

We can now aggregate our results

In [24]:
op_agg = (op
  .groupby(["age_group", "sex", "tretspef", "pod", "type", "model_run"], as_index = False)
  .agg({ "attendances": np.sum, "tele_attendances": np.sum })
)

#### Process A&E Data

In [25]:
# create the pod & measure columns
aae["pod"] = "type-" + aae["aedepttype"]
aae["measure"] = "walk-in"
aae.loc[aae["aearrivalmode"] == "1", "measure"] = "ambulance"

In [26]:
aae_mr = (aae[[
    "age_group", "sex", "imd04_decile", "ethnos", "aedepttype", "aearrivalmode", "pod", "measure"
  ]]
  .merge(load_model_results("aae"), left_index = True, right_index = True)
  .reset_index(drop = True)
)

Join the results and the baseline data together

In [27]:
aae["type"] = "baseline"
aae_mr["type"] = "model"

aae["model_run"] = 0

aae = pd.concat([aae[aae_mr.columns.tolist()].reset_index(drop = True), aae_mr])

We can now aggregate our results

In [28]:
aae_agg = (aae
  .groupby(["age_group", "sex", "pod", "type", "model_run", "measure"], as_index = False)
  .agg({ "arrivals": np.sum })
  .rename({"arrivals": "value"}, axis = "columns")
  .assign(dataset = "aae", tretspef = "X01")
)

### Write Aggregate Results

First we need to union all of the rows together. To do this we need to pivot our OP and IP data into a longer format. The A&E data is already in this form.

In [29]:
melted_data = [
  pd.melt(
    op_agg.assign(dataset = "op"),
    ["age_group", "sex", "tretspef", "dataset", "pod", "type", "model_run"],
    ["attendances", "tele_attendances"],
    "measure"
  ),

  pd.melt(
    ip_agg.assign(dataset = "ip"),
    ["age_group", "sex", "tretspef", "dataset", "pod", "type", "model_run"],
    ["admissions", "beddays"],
    "measure"
  ),

  aae_agg
]
all_agg = pd.concat(melted_data).sort_values([
  "sex", "dataset", "pod", "measure", "tretspef", "age_group", "type", "model_run"
])
# update the type of the principal runs
all_agg.loc[(all_agg["model_run"] == 0) & (all_agg["type"] == "model"), "type"] = "principal"

We can now save the model results

In [30]:
all_agg.to_parquet(f"{results_path}/model_results.parquet", index = False)

### Process Change Factors

In [31]:
def load_change_factors(dataset, model_run):
  path = f"{results_path}/{dataset}_change_factors/{model_run}.json"
  with open(path, "r", encoding = "UTF-8") as cfp:
    cf = (pd.concat({
        k1: pd.DataFrame({
          k2: v2 if isinstance(v2, dict) else { '-': v2 }
          for k2, v2 in v1.items()
        })
        for k1, v1 in json.load(cfp).items()
      })
      .rename_axis(["change_factor", "type"])
      .reset_index()
      .assign(dataset = dataset, model_run = model_run)
    )
  return cf

In [32]:
change_factors = pd.concat([
  load_change_factors(ds, mr)
  for ds in ["ip"]
  for mr in range(model_runs)
])

In [33]:
change_factors.to_csv(f"{results_path}/change_factors.csv", index = False)

#### Delete model run files

We no longer need to keep all of the underlying model run files.

In [34]:
for i in ["aae", "ip", "op", "ip_change_factors"]:
  shutil.rmtree(f"{results_path}/{i}")