# QA data checks

This Databricks notebook is for checking data > v3.0. Creates counts by POD of each activity type.

In [0]:
import pandas as pd
import sys
import pyspark.sql.functions as F
%load_ext autoreload
%autoreload 2
from nhpy import process_data

sys.path.append(spark.conf.get("bundle.sourcePath", "."))

trusts = [
    'RCF',
    'RDU',
    'RGN',
    'RGP',
    'RCX',
    'RBT',
    'RN5',
    'RAS',
    'RQW',
    'RWG',
    'R1H',
    'RWE',
    'RVR',
    'RNQ',
    'RH5',
    'RA9',
    'R0A',
    'RXC',
    'RTX',
    'RH8',
    'RHW',
    'RXN',
    'RYJ',
    'RX1',
    'RGR',
    'RD8',
    'REF'
]

year = 2023

In [0]:
data_versions = sorted([f.name.strip("/") for f in dbutils.fs.ls('/Volumes/nhp/model_data/files') if f.isDir() and f.name.startswith("v")])
data_versions

## Inpatients

In [0]:


def aggregate_data_ip(data, version):
    data["beddays"] = data["speldur"] + 1
    return (
        data.groupby("pod")
        .agg({"rn": "count", "beddays": "sum"})
        .rename(
            columns={"rn": f"{version}_admissions", "beddays": f"{version}_beddays"}
        )
    )

data_dict_ip = {}

for trust in trusts:
    for version in ['dev'] + data_versions[-1:]:
        data = pd.read_parquet(f"/Volumes/nhp/model_data/files/{version}/ip/fyear={year}/dataset={trust}")
        data = process_data.add_pod_to_data_ip(data)
        data = aggregate_data_ip(data, version)
        if version == 'dev':
            df = data.copy()
        else:
            df = df.merge(data, left_index=True, right_index=True, how='outer').fillna(0).astype(int)
    df['trust'] = trust
    data_dict_ip[trust] = df


## Outpatients


In [0]:
def aggregate_data_op(data, version):
    return (
        data.groupby("pod")
        .agg({"attendances": "sum", "tele_attendances": "sum"})
        .rename(
            columns={
                "attendances": f"{version}_attendances",
                "tele_attendances": f"{version}_tele_attendances",
            }
        )
    )


data_dict_op = {}

for trust in trusts:
    for version in ["dev"] + data_versions[-1:]:
        data = pd.read_parquet(
            f"/Volumes/nhp/model_data/files/{version}/op/fyear={year}/dataset={trust}"
        )
        data = process_data.add_pod_to_data_op(data)
        data = aggregate_data_op(data, version)
        if version == "dev":
            df = data.copy()
        else:
            df = (
                df.merge(data, left_index=True, right_index=True, how="outer")
                .fillna(0)
                .astype(int)
            )
    df["trust"] = trust
    data_dict_op[trust] = df

## A&E

In [0]:
# aae functions


def add_pod_to_data_aae(data):
    """Adds the POD column to AAE data."""
    data["pod"] = "aae_type-" + data["aedepttype"]

    return data


def aggregate_data_aae(data, version):
    return (
        data.groupby(["pod", "group"])
        .agg({"arrivals": "sum"})
        .rename(columns={"arrivals": f"{version}_arrivals"})
    )

data_dict_aae = {}

for trust in trusts:
    for version in ["dev"] + data_versions[-1:]:
        data = pd.read_parquet(
            f"/Volumes/nhp/model_data/files/{version}/aae/fyear={year}/dataset={trust}"
        )
        data = add_pod_to_data_aae(data)
        data = aggregate_data_aae(data, version)
        if version == "dev":
            df = data.copy()
        else:
            df = (
                df.merge(data, left_index=True, right_index=True, how="outer")
                .fillna(0)
                .astype(int)
            )
    df["trust"] = trust
    data_dict_aae[trust] = df

## Mitigators

In [0]:
# IP mitigators

ip_mitigators_dict = {}

for version in ["dev"] + data_versions[-1:]:
    aa = pd.read_parquet(
            f"/Volumes/nhp/model_data/files/{version}/ip_activity_avoidance_strategies/fyear={year}"
        ).groupby("strategy").agg({"rn": "count", "sample_rate": "sum"}).rename(columns={"rn": "count"})
    ef = pd.read_parquet(
            f"/Volumes/nhp/model_data/files/{version}/ip_efficiencies_strategies/fyear={year}"
        ).groupby("strategy").agg({"rn": "count", "sample_rate": "sum"}).rename(columns={"rn": "count"})
    ip_mitigators_dict[version] = pd.concat([aa, ef], axis=0)
ip_mitigators = ip_mitigators_dict["dev"].merge(ip_mitigators_dict[data_versions[-1]],  left_index=True, right_index=True, how="outer", suffixes=('_dev', f'_{data_versions[-1]}'))

In [0]:
# OP mitigators

op_mitigators_dict = {}

for version in ["dev"] + data_versions[-1:]:
    op = pd.read_parquet(
                f"/Volumes/nhp/model_data/files/{version}/op/fyear={year}")
    op_mitigators = process_data.get_all_op_mitigators(op)
    op_mitigators_dict[version] = op_mitigators
op_mitigator_names = op_mitigators.columns
combined_op_mitigators = op_mitigators_dict["dev"].merge(op_mitigators_dict[data_versions[-1]],  left_index=True, right_index=True, how="outer", suffixes=('_dev', f'_{data_versions[-1]}'))


In [0]:
# AE mitigators

aae_mitigators_dict = {}

for version in ["dev"] + data_versions[-1:]:
    aae = pd.read_parquet(
                f"/Volumes/nhp/model_data/files/{version}/aae/fyear={year}")
    aae_mitigators = process_data.get_all_ae_mitigators(aae)
    aae_mitigators_dict[version] = aae_mitigators
aae_mitigator_names = aae_mitigators.columns
combined_aae_mitigators = aae_mitigators_dict["dev"].merge(aae_mitigators_dict[data_versions[-1]],  left_index=True, right_index=True, how="outer", suffixes=('_dev', f'_{data_versions[-1]}'))

## Combine results

In [0]:
from datetime import datetime
today_date = datetime.now().strftime("%Y-%m-%d")

full_df_ip = pd.concat(data_dict_ip.values()).reset_index().set_index(['trust', 'pod'])
try:
    assert(full_df_ip["dev_admissions"]).equals(full_df_ip[f"{data_versions[-1]}_admissions"])
    assert(full_df_ip["dev_beddays"]).equals(full_df_ip[f"{data_versions[-1]}_beddays"])
except:
    print("Error! Please check file")
full_df_ip.to_csv(f"{today_date}_QA_ip.csv")

In [0]:
full_df_op = pd.concat(data_dict_op.values()).reset_index().set_index(['trust', 'pod'])
try:
    assert(full_df_op["dev_attendances"]).equals(full_df_op[f"{data_versions[-1]}_attendances"])
    assert(full_df_op["dev_tele_attendances"]).equals(full_df_op[f"{data_versions[-1]}_tele_attendances"])
except:
    print("Error! Please check file")
full_df_op.to_csv(f"{today_date}_QA_op.csv")

In [0]:
full_df_aae = pd.concat(data_dict_aae.values()).reset_index().set_index(['trust', 'pod'])
try:
    assert(full_df_aae["dev_arrivals"]).equals(full_df_aae[f"{data_versions[-1]}_arrivals"])
except:
    print("Error! Please check file")
full_df_aae.to_csv(f"{today_date}_QA_aae.csv")

In [0]:
# IP mitigators
try:
    assert(ip_mitigators["count_dev"]).equals(ip_mitigators[f"count_{data_versions[-1]}"])
    assert(ip_mitigators["sample_rate_dev"]).equals(ip_mitigators[f"sample_rate_{data_versions[-1]}"])
except:
    print("Error! Please check file")
    diffs = ip_mitigators["count_dev"] - ip_mitigators[f"count_{data_versions[-1]}"]
ip_mitigators.to_csv(f"{today_date}_QA_ip_mitigators.csv")
diffs

In [0]:
# OP mitigators
try:
    for col in op_mitigator_names:
        assert(combined_op_mitigators[f"{col}_dev"]).equals(combined_op_mitigators[f"{col}_{data_versions[-1]}"])
except:
    print("Error! Please check file")
    print("Mitigator with error: {col}")
combined_op_mitigators.sort_index(axis=1).to_csv(f"{today_date}_QA_op_mitigators.csv")

In [0]:
# AAE mitigators
try:
    for col in aae_mitigator_names:
        assert(combined_aae_mitigators[f"{col}_dev"]).equals(combined_aae_mitigators[f"{col}_{data_versions[-1]}"])
except:
    print("Error! Please check file")
    print("Mitigator with error: {col}")
combined_aae_mitigators.sort_index(axis=1).to_csv(f"{today_date}_QA_aae_mitigators.csv")