From 9fa0aea7791edf344d62e00e38b6a11fa4cfcd3d Mon Sep 17 00:00:00 2001 From: Tom Jemmett Date: Wed, 8 Oct 2025 19:55:13 +0100 Subject: [PATCH] adds inequalities methods --- src/nhp/databricks/icb.py | 13 +++++++++++++ src/nhp/databricks/national.py | 25 +++++++++++++++---------- src/nhp/databricks/provider.py | 17 ++++++++++++++--- 3 files changed, 42 insertions(+), 13 deletions(-) diff --git a/src/nhp/databricks/icb.py b/src/nhp/databricks/icb.py index 0410e72..d179200 100644 --- a/src/nhp/databricks/icb.py +++ b/src/nhp/databricks/icb.py @@ -185,3 +185,16 @@ def get_hsa_gams(self): """Get the health status adjustment gams.""" # this is not supported in our data bricks environment currently raise NotImplementedError + + def get_inequalities(self) -> pd.DataFrame: + """Get the inequalities dataframe. + + Returns: + The inequalities dataframe. + """ + return ( + self._spark.read.parquet(f"{self._data_path}/inequalities") + .filter(F.col("icb") == self._icb) + .filter(F.col("fyear") == self._year) + .toPandas() + ) diff --git a/src/nhp/databricks/national.py b/src/nhp/databricks/national.py index cdd0277..c92e664 100644 --- a/src/nhp/databricks/national.py +++ b/src/nhp/databricks/national.py @@ -55,9 +55,7 @@ def create( :return: a function to initialise the object :rtype: Callable[[str, str], Databricks] """ - return lambda fyear, _: DatabricksNational( - spark, data_path, fyear, sample_rate, seed - ) + return lambda fyear, _: DatabricksNational(spark, data_path, fyear, sample_rate, seed) def get_ip(self) -> pd.DataFrame: """Get the inpatients dataframe. @@ -98,14 +96,10 @@ def get_op(self) -> pd.DataFrame: # TODO: temporary fix, see #353 .withColumn("sushrg_trimmed", F.lit("HRG")) .withColumn("imd_quintile", F.lit(0)) - .groupBy( - op.drop("index", "fyear", "attendances", "tele_attendances").columns - ) + .groupBy(op.drop("index", "fyear", "attendances", "tele_attendances").columns) .agg( (F.sum("attendances") * self._sample_rate).alias("attendances"), - (F.sum("tele_attendances") * self._sample_rate).alias( - "tele_attendances" - ), + (F.sum("tele_attendances") * self._sample_rate).alias("tele_attendances"), ) # TODO: how do we make this stable? at the moment we can't use full model results with # national @@ -209,4 +203,15 @@ def get_hsa_gams(self): """Get the health status adjustment gams.""" # this is not supported in our data bricks environment currently raise NotImplementedError - raise NotImplementedError + + def get_inequalities(self) -> pd.DataFrame: + """Get the inequalities dataframe. + + Returns: + The inequalities dataframe. + """ + return ( + self._spark.read.parquet(f"{self._data_path}/inequalities") + .filter(F.col("fyear") == self._year) + .toPandas() + ) diff --git a/src/nhp/databricks/provider.py b/src/nhp/databricks/provider.py index a331493..6ac1663 100644 --- a/src/nhp/databricks/provider.py +++ b/src/nhp/databricks/provider.py @@ -34,9 +34,7 @@ def create(spark: SparkSession, data_path: str) -> Callable[[int, str], Any]: :return: a function to initialise the object :rtype: Callable[[str, str], Databricks] """ - return lambda fyear, dataset: DatabricksProvider( - spark, data_path, fyear, dataset - ) + return lambda fyear, dataset: DatabricksProvider(spark, data_path, fyear, dataset) @property def _apc(self): @@ -144,3 +142,16 @@ def get_hsa_gams(self): """Get the health status adjustment gams.""" # this is not supported in our data bricks environment currently raise NotImplementedError + + def get_inequalities(self) -> pd.DataFrame: + """Get the inequalities dataframe. + + Returns: + The inequalities dataframe. + """ + return ( + self._spark.read.parquet(f"{self._data_path}/inequalities") + .filter(F.col("dataset") == self._dataset) + .filter(F.col("fyear") == self._year) + .toPandas() + )