## Admitted patient care

In [0]:
# direct pyspark sql version of above
from pyspark.sql import functions as F

# Load tables
apc_df = spark.table("nhp.raw_data.apc")
activity_mitigation_df = spark.table("activity_mitigation_table")
icd10_codes_gabriel_df = spark.table("icd10_codes_gabriel")

# Join tables
joined_df = apc_df.join(activity_mitigation_df, apc_df.epikey == activity_mitigation_df.epikey, "left") \
                  .join(icd10_codes_gabriel_df, apc_df.primary_diagnosis == icd10_codes_gabriel_df.icd10, "left")

# Filter and group by
result_df = joined_df.filter((apc_df.fyear >= '201112') & (apc_df.fyear <= '201920')) \
    .groupBy(
        apc_df.fyear,
        apc_df.group.alias("pod"),
        apc_df.age,
        apc_df.resgor_ons,
        F.when(apc_df.sex == '1', 'm').otherwise('f').alias("sex"),
        icd10_codes_gabriel_df.chapter_number,
        apc_df.imd_quintile,
        activity_mitigation_df.mitigation_type
    ) \
    .agg(F.count(apc_df.epikey).alias("activity"))


In [0]:
from pyspark.sql import functions as F

summary_df = result_df.groupBy("fyear", "pod", "mitigation_type").agg(
    F.sum("activity").alias("activity")
)


In [0]:
from pyspark.sql.window import Window

w = Window.partitionBy("pod", "mitigation_type").orderBy("fyear")

first_activity = F.first("activity").over(w.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))

indexed_df = summary_df.withColumn("first_activity", F.first("activity").over(w)) \
                       .withColumn("activity_indexed", F.col("activity") / F.col("first_activity"))


In [0]:
import seaborn as sns
import matplotlib.pyplot as plt

# Set style and color palette (customize if needed)
sns.set(style="whitegrid")


# Create FacetGrid
g = sns.FacetGrid(indexed_df.toPandas(), col="pod", col_wrap=3, sharey=False, height=4)

# Add lineplots to each facet
g.map_dataframe(sns.lineplot,
                x="fyear", 
                y="activity_indexed", 
                hue="mitigation_type", 
                palette=palette, 
                linewidth=1)

# Adjust legend
g.add_legend(title=None, ncol=2)
g.set_titles(col_template="{col_name}")
g.set_axis_labels("Financial year", "Activity (indexed)")

# Improve layout
plt.subplots_adjust(top=0.9)
g.fig.suptitle("Indexed trends by pod and mitigation for gam/bam data")
plt.tight_layout(rect=[0, 0, 1, 0.95])  # leave space for title


## opa

In [0]:
opa_mitig = spark.table("nhp.raw_data.opa_mitigators")

In [0]:
opa_mitig.select("strategy").distinct().display()

In [0]:
from pyspark.sql.functions import col, when, countDistinct, lit

opa_df = spark.table("nhp.raw_data.opa")
opa_mitigators_df = spark.table("nhp.raw_data.opa_mitigators").filter(~col("strategy").like("convert_to_tele%"))

result_df = opa_df.alias("a") \
    .join(opa_mitigators_df.alias("b"), 
          (col("a.attendkey") == col("b.attendkey")) & 
          (col("a.provider") == col("b.provider")) & 
          (col("a.fyear") == col("b.fyear")), 
          "left") \
    .filter(col("a.fyear").between(201112, 201920)) \
    .groupBy(
        col("a.fyear"), 
        col("a.age"), 
        col("a.sex"), 
        col("a.type"), 
        col("a.imd_quintile"), 
        col("a.resgor_ons"),
        when(col("b.strategy").isNotNull(), "deAdopt").otherwise("none").alias("mitigation_type")
    ) \
    .agg(countDistinct(col("a.attendkey")).alias("activity")) \
    .withColumn("pod", lit("opa"))

In [0]:
result_df.display()