# Talking Therapies
Data exploration worksheet.

## Data load
- Function imports
- Base table loads

### Set up

In [0]:
# should 'data check' calculations be performed?
global flag_data_check, flag_data_display, flag_data_export

flag_data_check = False # should data checks be performed?
flag_data_display = False # should data be displayed?
flag_data_export = True # should data be exported?

In [0]:
# imports
import pyspark.sql.functions as F
from pyspark.sql import Window as W

In [0]:
# set path to iapt referrals
lake = "udalstdatacuratedprod.dfs.core.windows.net"
container = "restricted"
file = "/patientlevel/MESH/IAPT/IDS101referral/Published/2/"
iapt_path_referral = "abfss://" + container + "@" + lake + file

# set path to iapt mpi
lake = "udalstdatacuratedprod.dfs.core.windows.net"
container = "restricted"
file = "/patientlevel/MESH/IAPT/IDS001mpi/Published/3/"
iapt_path_mpi = "abfss://" + container + "@" + lake + file

# set path to iapt contacts
lake = "udalstdatacuratedprod.dfs.core.windows.net"
container = "restricted"
file = "/patientlevel/MESH/IAPT/IDS201carecontact/Published/2/"
iapt_path_contact = "abfss://" + container + "@" + lake + file

# set path to organisation details
lake = "udalstdatacuratedprod.dfs.core.windows.net"
container = "unrestricted"
file = "/reference/UKHD/ODS_API/vwOrganisation_SCD_IsLatestEqualsOneWithRole"
ods_path = "abfss://" + container + "@" + lake + file

# set path to lake mart
# https://udalstdataanalysisprod.dfs.core.windows.net/analytics-projects/StrategyUnit/StrategyUnit/Evaluation/E004546-0001_Talking_Therapies/
lake = "udalstdataanalysisprod.dfs.core.windows.net"
container = "analytics-projects"
folder = "/StrategyUnit/StrategyUnit/Evaluation/E004546-0001_Talking_Therapies/"
eval_path = "abfss://" + container + "@" + lake + folder


### IAPT referrals

In [0]:
iapt_referrals = spark.read.option("header", "true").option("recursiveFileLookup", "true").parquet(iapt_path_referral)

if flag_data_display:
    iapt_referrals.limit(10).display()

In [0]:
if flag_data_check:
    check_iapt_referral_id_unique = iapt_referrals \
        .groupBy("PathwayID").count().filter(F.col("count") > 1)

    if flag_data_display:
        display(check_iapt_referral_id_unique)

In [0]:
if flag_data_check:
        iapt_referrals.filter(F.col("PathwayID") == "6d6a4bfda814475ba409586a19dce42e").orderBy("EFFECTIVE_FROM").display()

There are multiple records per `PathwayID`, where the same referral was reported over subsequent months. There are also seemingly duplicates of the same record (same `EFFECTIVE_FROM` timestamp and same `UDALFieldID` and same `UniqueID_IDS101`, which means it's not very unique). Plan:
- Filter for the latest `EFFECTIVE_FROM` timestamp per `PathwayID`, use a rownumber approach to ensure only one record is returned per `PathwayID` as the official version.

## update 2025-07-11
Since setting the referral path to only include 'Published' data and selecting a specific version of published data, the `UniqueID_IDS101` is now unique among the list of records. There are still multiple records per `PathwayID` but these all appear to be submissions made over multiple months whilst this referral was active, and is to be expected.

In [0]:
# define a window spec for each person in MPI
window_spec = W.partitionBy("PathwayID").orderBy(F.col("EFFECTIVE_FROM").desc())

# get the latest record for each person
iapt_referrals_latest = (
    iapt_referrals
        .withColumn("row_number", F.row_number().over(window_spec)) # assign 1,2,3... within each id
        .filter(F.col("row_number") == 1) # keep only top row per id
        .drop("row_number")
)

if flag_data_display:
    iapt_referrals_latest.limit(10).display()

In [0]:
if flag_data_check:
    check_iapt_referral_id_unique_v2 = iapt_referrals_latest \
        .groupBy("PathwayID").count().filter(F.col("count") > 1)

    if flag_data_display:
        display(check_iapt_referral_id_unique_v2)

Yes, they're unique now.

### IAPT MPI
Get some details for the patient, such as deprivation (based on LSOA), gender identity, langauge, ethnicity.

In [0]:
iapt_mpi = spark.read.option("header", "true").option("recursiveFileLookup", "true").parquet(iapt_path_mpi)

if flag_data_display:
    iapt_mpi.limit(10).display()

In [0]:
if flag_data_check:
    check_mpi_id_unique = iapt_mpi.groupBy("Person_ID", "UniqueSubmissionID").count().filter(F.col("count") > 1)

    if flag_data_display:
        display(check_mpi_id_unique)

No, it appears the combination of `Person_ID` and `UniqueSubmissionID` is not unique within this table.

**IMPORTANT NOTE**

We probably want to get the details for each person at the point of their discharge, or the latest submission if their referral is still active.

This means we need to join this data to the referrals table on both `Person_ID` and `Unique_Submission_ID` to ensure the patient details are contemporaneous with the referral.

In [0]:
if flag_data_check:
    iapt_mpi.filter((F.col("Person_ID") == "N6STX48XQU3Y8JT") & (F.col("UniqueSubmissionID") == "9534275")).display()

Note on duplicate MPI records:

This is fine and probably what we want. We can use the multiple records per UID to link MPI to referrals on both PatientID AND SubmissionID, thereby getting the LSOA (address) which was applicable at the time of referral.

See below 'Joins' section for details of how this was achived.

### Provider details
We want some details about the organisation that provided Talking Therapies services to patients.

In [0]:
ods = spark.read.option("header","true").option("recursiveFileLookup","true").parquet(ods_path)

if flag_data_display:
    ods.limit(10).display()

In [0]:
if flag_data_check:
    check_ods_codes_unique = ods \
        .groupBy("ODS_Code").count().filter(F.col("count") > 1)

    if flag_data_display:
        display(check_ods_codes_unique)

Yes, all ODS codes are unique to the ODS table.

### IAPT contacts
Load in care contact details

In [0]:
iapt_contacts = spark.read.option("header", "true").option("recursiveFileLookup", "true").parquet(iapt_path_contact)

if flag_data_display:
    iapt_contacts.limit(10).display()

In [0]:
if flag_data_check:
    check_contact_codes_unique = iapt_contacts \
        .groupBy("Unique_CareContactID").count().filter(F.col("count") > 1)

    if flag_data_display:
        display(check_contact_codes_unique)

In [0]:
if flag_data_display:
    # there are some duplicates, some are quite large. Lets take a look at one
    iapt_contacts.filter(F.col("Unique_CareContactID") == "RX3YE6090764").display()

OK, so it looks like the `Unique_CareContactID` can refer to multiple records. It looks like this is due to submissions over multiple months, and the details **can** change, e.g. the above example shows the contact date changes date and time.

Speculation: the clinician changed some (but not all) details for a contact, possibly to correct an erroneous record. In this case, it looks like they changed the date and time of the appointment and also changed attendance from 'attended' to being 'cancelled' by the patient.

In [0]:
# define a window spec for each person in MPI
window_spec = W.partitionBy("Unique_CareContactID").orderBy(F.col("EFFECTIVE_FROM").desc())

# get the latest record for each person
iapt_contacts_latest = (
    iapt_contacts
        .withColumn("row_number", F.row_number().over(window_spec)) # assign 1,2,3... within each id
        .filter(F.col("row_number") == 1) # keep only top row per id
        .drop("row_number")
)

if flag_data_display:
    iapt_contacts_latest.limit(10).display()

In [0]:
if flag_data_check:
    check_contact_codes_unique_v2 = iapt_contacts_latest \
        .groupBy("Unique_CareContactID").count().filter(F.col("count") > 1)

    if flag_data_display:
        display(check_contact_codes_unique_v2)

Yes, they are unique now. :)

## Joins

### Referrals - MPI

In [0]:
# get a truncated version of the MPI containing gender, deprivation and ethnicity
iapt_mpi_short = iapt_mpi.select(
    # keys
    "Person_ID", 
    "UniqueSubmissionID",
    "EFFECTIVE_FROM",
    "RecordNumber",
    # details
    "Gender",
    "GenderIdentity",
    "IndicesOfDeprivationDecile",
    "IMD_YEAR",
    "EthnicCategory",
    "Validated_EthnicCategory",
    "EthnicCategory2021"
).distinct()

In [0]:
# There are some records that appear to show multiple records for the same person / unique submission ID
# These need to be consolidated to one row per person / unique submission ID combo.

# define a window spec for each person in MPI
window_spec = W.partitionBy("Person_ID", "UniqueSubmissionID").orderBy(F.col("EFFECTIVE_FROM").desc(), F.col("RecordNumber").desc())

# get the latest record for each person
iapt_mpi_short_unique = (
    iapt_mpi_short
        .withColumn("row_number", F.row_number().over(window_spec)) # assign 1,2,3... within each id
        .filter(F.col("row_number") == 1) # keep only top row per id
        .drop("row_number")
)

if flag_data_display:
    iapt_mpi_short_unique.limit(10).display()

In [0]:
if flag_data_check:
    check_mpi_short_multiples = iapt_mpi_short_unique \
        .groupBy("Person_ID", "UniqueSubmissionID").count().filter(F.col("count") > 1) 

    if flag_data_display:
        display(check_mpi_short_multiples.orderBy("count", ascending=False))

Great news - we have unique MPI records for each Person/Submission ID combo. This df is ready for joining with the referrals records.

In [0]:
iapt_referrals_latest_test = iapt_referrals_latest \
  .join(iapt_mpi_short_unique, 
        on = (
          (iapt_referrals_latest["Person_ID"] == iapt_mpi_short_unique["Person_ID"]) &
          (iapt_referrals_latest["UniqueSubmissionID"] == iapt_mpi_short_unique["UniqueSubmissionID"])
        ),
        how = "left"
  )

if flag_data_display:
    iapt_referrals_latest_test.limit(10).display()

In [0]:
if flag_data_check:
    n_rows_referrals = iapt_referrals_latest.count()
    n_rows_referrals_test = iapt_referrals_latest_test.count()
    print(f"Original: {n_rows_referrals:,}")
    print(f"Test:     {n_rows_referrals_test:,}")

Great news - the number of referrals remains constant after the left-join.

In [0]:
iapt_referrals_latest = iapt_referrals_latest_test

### Referrals - ODS

In [0]:
iapt_referrals_latest = iapt_referrals_latest.join(
    ods, 
    on = iapt_referrals_latest["OrgID_Provider"] == ods["ODS_Code"], 
    how = "left"
)

if flag_data_display:
    iapt_referrals_latest.limit(10).display()

In [0]:
if flag_data_check:
    
    ods = iapt_referrals_latest.select("ODS_Code", "Name").withColumnRenamed("Name", "ODS_Name").distinct()
    
    if flag_data_display:
        ods.display()
else:
    display("Data check flag is set to False")

All `ODS_Code` values appear to have corresponding `ODS_Name` values, indicating all provider codes have been identified.
There are 181 providers of Talking Therapies listed. This appears to be the right order of magnitude when compared with the range of providers reported in the IAPT dashboard.

In [0]:
if flag_data_check:
    # see if there are any nulls in the `ODS_Name` field - i.e. checking for unmatched `OrgID_Provider`
    null_count = iapt_referrals_latest.groupBy("OrgID_Provider").agg(F.count(F.when(F.col("PPSM").isNull(), True)).alias("null_ods_names"))

    if flag_data_display:
        display(null_count)

else:
    display("Data check flag is set to False")

All provider codes appear to be matched with corresponding provider names.

### Contacts - ODS
Link the organisation name to the care contact

In [0]:
iapt_contacts_latest = iapt_contacts_latest.join(
    ods, 
    on = iapt_contacts_latest["OrgID_Provider"] == ods["ODS_Code"], 
    how = "left"
)

if flag_data_display:
    iapt_contacts_latest.limit(10).display()

# Explore

## Referrals

In [0]:
# get the total number of referrals (denominator)
total_referrals_count = (
    iapt_referrals_latest
    # only work with records that have been discharged
    .filter(F.col("ServDischDate").isNotNull())
    .select(F.countDistinct("PathwayID")).collect()[0][0]
)

# get the total number of referrals (denominator)
total_referrals_filtered_count = (
    iapt_referrals_latest
    # only work with records that have been discharged
    .filter(
        (F.col("ServDischDate").isNotNull()) &
        (F.col("UsePathway_Flag") == True) & 
        (F.col("EndCode").isin([46, 47, 48, 49, 96]))
    )
    .select(F.countDistinct("PathwayID")).collect()[0][0]
)


### Age distribution

In [0]:
if flag_data_check:
    referral_age_distribution = iapt_referrals_latest \
        .groupBy(F.col("Age_ReferralRequest_ReceivedDate")) \
            .agg(F.countDistinct(F.col("Unique_ServiceRequestID")).alias("unique_referrals"))\
                .orderBy("Age_ReferralRequest_ReceivedDate")

    if flag_data_display:
        referral_age_distribution.display()
        
else:
    display("Data check flag is set to False")

Interesting... several hundred referrals were received for patients aged 0, that's interesting and doesn't look right. There are a (relatively) small number of referrals for people aged 0 to 15 - presumably data quality issues. 
The bulk of referrals ramp upwards from age 16 onwards, which fits references for NHS TT being an adult service but which can take adolescents too.


In [0]:
# Lets see what happens if we focus on referrals received after Jan 2022
if flag_data_check:
    referral_age_distribution_since2022 = iapt_referrals \
        .filter(F.col("ReferralRequestReceivedDate") > F.lit("2022-01-01")) \
            .groupBy(F.col("Age_ReferralRequest_ReceivedDate")) \
                .agg(F.countDistinct(F.col("Unique_ServiceRequestID")).alias("unique_referrals"))\
                    .orderBy("Age_ReferralRequest_ReceivedDate")

    if flag_data_display:
        referral_age_distribution_since2022.display()
        
else:
    display("Data check flag is set to False")

There are still quite a few referrals for people aged 0 to 15 in this sample.
The number of referrals ramps up from age 16 onward, which fits expectations.

### Referrals per month
Looking to get a count of unique referrals by month the referral was received and discharged. These will be the denominators for the aggregate summaries per provider.

In [0]:
# work out the year-month for referrals received and discharged
iapt_referrals_latest = iapt_referrals_latest \
    .withColumn("calc_Referral_Received_YM", F.date_format("ReferralRequestReceivedDate", "yyyy-MM")) \
        .withColumn("calc_Referral_Discharged_YM", F.date_format("ServDischDate", "yyyy-MM"))

if flag_data_display:
    iapt_referrals_latest \
        .select("Unique_ServiceRequestID", "ReferralRequestReceivedDate", "ServDischDate", "calc_Referral_Received_YM", "calc_Referral_Discharged_YM") \
            .limit(10).display()

In [0]:
if flag_data_check:
  # Count referrals received
  iapt_referrals_received_count = iapt_referrals_latest \
    .groupBy("calc_Referral_Received_YM") \
      .agg(F.countDistinct("PathwayID").alias("referrals_received_count")) \
        .orderBy("calc_Referral_Received_YM")
  # Count referrals discharged
  iapt_referrals_discharged_count = iapt_referrals_latest \
    .groupBy("calc_Referral_Discharged_YM") \
      .agg(F.countDistinct("PathwayID").alias("referrals_discharged_count")) \
        .orderBy("calc_Referral_Discharged_YM")
else:
  display("Flag Data Check is set to False")

In [0]:
if flag_data_check:
    iapt_referrals_received_count.display()
else:
    display("Flag Data Check is set to False")

In [0]:
if flag_data_check:
    iapt_referrals_discharged_count.display()
else:
    display("Flag Data Check is set to False")

**Referrals over time - comments**
- Some referrals were received as far back as the 1940s.
- Not just some extreme outliers, referrals are counted in ones and twos a month for each decade, 1950s, 1960s, 1970s, 1980s, etc. ? Linked with person's year of birth.
- Count of referrals begin to ramp up from 2019 onwards, reaching what is currently the steady state of 140k referrals per month around September 2020. 

In [0]:
if flag_data_check:
  # Count distinct pathwayid by YearMonth of discharge
  iapt_referrals_per_month_provider = iapt_referrals_latest \
    .groupBy("calc_Referral_Discharged_YM", "ODS_Code", "Name") \
      .agg(F.countDistinct("PathwayID").alias("referral_count")) \
        .orderBy(F.desc("calc_Referral_Discharged_YM"), "ODS_Code")
  
  if flag_data_display:
    iapt_referrals_per_month_provider.display()

else:
  display("Flag Data Check is set to False")

### Referral discharge reason

In [0]:
if flag_data_check:
    # summarise referrals
    ( iapt_referrals_latest
        # only work with records that have been discharged
        .filter(
            (F.col("ServDischDate").isNotNull()) &
            (F.col("UsePathway_Flag") == True)
        )
        
        # count activity by interpreter present
        .groupBy("EndCode")
        .agg(F.countDistinct("PathwayID").alias("referrals_count"))
        .withColumn("referrals_rate", F.col("referrals_count") / total_referrals_count)
        .withColumn("referrals_perc", F.format_number("referrals_rate", 2))
        .orderBy(F.col("referrals_count").desc())
        .display()
    )

- 25% `46` Mututally agreed completion of treatment
- 16% `50` Not assessed
- 16% `47` Termination of treatment earlier than Care Professional planned
- 11% `14` Suitable for IAPT service, but patient declined treatment that was offered
- 6% `13` Referred to another therapy service by mutual agreement
- 5% `11` Not suitable for IAPT service - signposted elsewhere with mutual agreement of patient

... 26 rows of information returned. The results are too messy to make sense of at this granularity. Will group the codes according to the ToS groups to see what that shows.

In [0]:
if flag_data_check:
    # lets simplify the locations to improve readability
    end_code_mapping = {
        "Referred but not seen": [50],
        "Seen but not taken on for a course of treatment": [10, 11, 12, 13, 14, 16, 17, 95],
        "Seen and taken on for a course of treatment": [46, 47, 48, 49, 96]
    }

    # convert the dictionary to a dataframe
    mapping_list = [(code, group) for group, codes in end_code_mapping.items() for code in codes]
    mapping_df = spark.createDataFrame(mapping_list, ["EndCode", "calc_endcode_group"])

    # summarise contacts by location group
    ( iapt_referrals_latest
        # only work with records that have been discharged
        .filter(
            (F.col("ServDischDate").isNotNull()) &
            (F.col("UsePathway_Flag") == True)
        )

        # add the location groups
        .join(mapping_df, on="EndCode", how="left")
        
        # count activity by location group
        .groupBy("calc_endcode_group")
        .agg(F.countDistinct("PathwayID").alias("referrals_count"))
        .withColumn("referrals_rate", F.col("referrals_count") / total_referrals_count)
        .withColumn("referrals_perc", F.format_number("referrals_rate", 2))
        .orderBy(F.col("referrals_count").desc())
        .display()
    )

- 43% Seen and taken on for a course of treatment
- 36% Seen but not taken on for a course of treatment
- 16% Referred but not seen
- 4% `null`

I think we need to filter referrals for those that were seen and taken on for a course of treatment. There will be a very high rate of dropout otherwise, which will distort the figures used when matching services, risking inappropriate matches.

### Referral problem

In [0]:
if flag_data_check:
    # summarise referrals
    ( iapt_referrals_latest
        # only work with records that have been discharged
        .filter(
            (F.col("ServDischDate").isNotNull()) &
            (F.col("UsePathway_Flag") == True) & 
            (F.col("EndCode").isin([46, 47, 48, 49, 96]))
        )
        
        # count activity by interpreter present
        .groupBy("PresentingComplaintHigherCategory")
        .agg(F.countDistinct("PathwayID").alias("referrals_count"))
        .withColumn("referrals_rate", F.col("referrals_count") / total_referrals_filtered_count)
        .withColumn("referrals_perc", F.format_number("referrals_rate", 2))
        .orderBy(F.col("referrals_count").desc())
        .display()
    )

- 42% Anxiety and stress related disorders (Total)
- 36% Depression
- 14% Unspecified
- 8% `null`
- 1% Other Mental Health problems
- 1% Other recorded problems
- 0% Invalid data supplied
- 0% Medically Unexplained Symptoms

So, it looks like `Anxiety and stress related disorders (Total)` is the code to use for the matching variables.

---


NB, the below list is based on referrals without a filter for 'Seen and taken on for a course of treatment'. I've kept it here for reference only.
- 39% Unspecified
- 27% Anxiety and stress related disorders
- 25% Depression
- 7% `null`
- 1% Other Mental Health problems
- 1% Other recorded problems
- 0% Invalid data supplied
- 0% Medically unexplained symptoms

### Gender and Gender identity

In [0]:
if flag_data_check:
    # summarise referrals
    ( iapt_referrals_latest
        # only work with records that have been discharged
        .filter(
            (F.col("ServDischDate").isNotNull()) &
            (F.col("UsePathway_Flag") == True) & 
            (F.col("EndCode").isin([46, 47, 48, 49, 96]))
        )

        # count activity by Gender
        .groupBy("Gender")
        .agg(F.countDistinct("PathwayID").alias("referrals_count"))
        .withColumn("referrals_rate", F.col("referrals_count") / total_referrals_filtered_count)
        .withColumn("referrals_perc", F.format_number("referrals_rate", 2))
        .orderBy(F.col("referrals_count").desc())
        .display()
    )

- 68% `2` Female
- 31% `1` Male
- 0% `X` Not known
- 0% `9` Indeterminate
- 0% `null`  
... also includes rows for `Z`, `C`, `3`, `0` and `4`, which are not codes in the current IAPT ToS.

In [0]:
if flag_data_check:
    # summarise referrals
    ( iapt_referrals_latest
        # only work with records that have been discharged
        .filter(
            (F.col("ServDischDate").isNotNull()) &
            (F.col("UsePathway_Flag") == True) & 
            (F.col("EndCode").isin([46, 47, 48, 49, 96]))
        )

        # count activity by GenderIdentity
        .groupBy("GenderIdentity")
        .agg(F.countDistinct("PathwayID").alias("referrals_count"))
        .withColumn("referrals_rate", F.col("referrals_count") / total_referrals_filtered_count)
        .withColumn("referrals_perc", F.format_number("referrals_rate", 2))
        .orderBy(F.col("referrals_count").desc())
        .display()
    )

- 58% `2` Female
- 27% `1` Male
- 12% `null`
- 2% `X` Not known
- 0% `Z` Not stated
- 0% `3` Non-binary
- 0% `4` Other (not listed)  
... also includes codes such as `C`, `F`, `M`, `#`, `9`, `N` which aren't part of the current IAPT ToS.

There are many more `null` values in Gender Identity than in Gender (12% vs 0%).

Assuming the effect of gender on likelihood to adhere to TT is a social construct rather than biological one, then it is preferable to use Gender Identify wherever possible. However, the 12% `null` values may affect the proportion of referrals, especially if they are unevenly distributed by provider. So the plan will be `coalesce` Gender Identity with Gender, replacing `null` Gender Identities with the value in Gender.

In [0]:
if flag_data_check:
    # summarise referrals
    ( iapt_referrals_latest
        # only work with records that have been discharged
        .filter(
            (F.col("ServDischDate").isNotNull()) &
            (F.col("UsePathway_Flag") == True) & 
            (F.col("EndCode").isin([46, 47, 48, 49, 96]))
        )

        # count activity by GenderIdentity and Gender
        .groupBy("GenderIdentity", "Gender")
        .agg(F.countDistinct("PathwayID").alias("referrals_count"))
        .withColumn("referrals_rate", F.col("referrals_count") / total_referrals_filtered_count)
        .withColumn("referrals_perc", F.format_number("referrals_rate", 2))
        .orderBy(F.col("referrals_count").desc())
        .display()
    )

Gaps in GenderIdentity can be filled with Gender:
- 8% `null` GI matched with `2` (Female)
- 4% `null` GI matched with `1` (Male)
- 0% `null` GI matched with `Z` (Not a code in the current ToS, but could be 'Not Stated' in Gender Identity)
- 0% `null` GI matched with `9` (Indeterminate)
- 0% `null` GI matched with `X` (Not known)

... there are 999 records where there is a `null` in both GenderIdentity and Gender.

Conclusion: supplementing Gender Identity with Gender codes seems a reasonable approach for these matching variables.

### Deprivation

In [0]:
if flag_data_check:
    # summarise referrals
    ( iapt_referrals_latest
        # only work with records that have been discharged
        .filter(
            (F.col("ServDischDate").isNotNull()) &
            (F.col("UsePathway_Flag") == True) & 
            (F.col("EndCode").isin([46, 47, 48, 49, 96]))
        )

        # count activity by deprivation decile
        .groupBy("IndicesOfDeprivationDecile")
        .agg(F.countDistinct("PathwayID").alias("referrals_count"))
        .withColumn("referrals_rate", F.col("referrals_count") / total_referrals_filtered_count)
        .withColumn("referrals_perc", F.format_number("referrals_rate", 2))
        .orderBy(F.col("referrals_count").desc())
        .display()
    )

- 11% decile 1
- 12% decile 2
- 11% decile 3
- 11% decile 4
- 10% decile 5
- 10% decile 6
- 9% decile 7
- 9% decile 8
- 8% decile 9
- 8% decile 10
- 1% `null`

... TT referrals seems to be slightly more prevalent among people living in areas of higher deprivation than with those living in areas of lower deprivation. There are a small number `null` entries

My guess is that these proportions can vary by geographies with some areas facing greater deprivation than others.

### Ethnicity
There are three promising fields in the MPI that can be used: "EthnicCategory", "Validated_EthnicCategory", "EthnicCategory2021".

Will explore each of these.

In [0]:
if flag_data_check:
    # summarise referrals
    ( iapt_referrals_latest
        # only work with records that have been discharged
        .filter(
            (F.col("ServDischDate").isNotNull()) &
            (F.col("UsePathway_Flag") == True) & 
            (F.col("EndCode").isin([46, 47, 48, 49, 96]))
        )

        # count activity by ethnicity
        .groupBy("EthnicCategory")
        .agg(F.countDistinct("PathwayID").alias("referrals_count"))
        .withColumn("referrals_rate", F.col("referrals_count") / total_referrals_filtered_count)
        .withColumn("referrals_perc", F.format_number("referrals_rate", 2))
        .orderBy(F.col("referrals_count").desc())
        .display()
    )

- 71% `A` White British
- 6% `C` Any other White Background
- 3% `Z` Not stated
- 2% `H` Indian
- 2% `99` Not known
- 2% `N` African
- 2% `J` Pakistani
- 2% `S` Any other ethnic group
- 2% `null`

... there were 38 rows returned.

In [0]:
if flag_data_check:
    # summarise referrals
    ( iapt_referrals_latest
        # only work with records that have been discharged
        .filter(
            (F.col("ServDischDate").isNotNull()) &
            (F.col("UsePathway_Flag") == True) & 
            (F.col("EndCode").isin([46, 47, 48, 49, 96]))
        )

        # count activity by ethnicity
        .groupBy("Validated_EthnicCategory")
        .agg(F.countDistinct("PathwayID").alias("referrals_count"))
        .withColumn("referrals_rate", F.col("referrals_count") / total_referrals_filtered_count)
        .withColumn("referrals_perc", F.format_number("referrals_rate", 2))
        .orderBy(F.col("referrals_count").desc())
        .display()
    )

- 71% `A` White British
- 6% `C` Any other White Background
- 3% `Z` Not stated
- 2% `H` Indian
- 2% `99` Not known
- 2% `N` African
- 2% `J` Pakistani
- 2% `S` Any other ethnic group
- 2% `-1` (Not a valid ToS code, ? null)

... There were 21 rows returned.

The ToS describes Validated_EthnicCategory as 'The submitted Ethnic Category with spaces removed and invalid values grouped together'.

The values are very similar to EthnicCategory, it may be preferable to use this field over that one due to the above data cleansing.

In [0]:
if flag_data_check:
    # summarise referrals
    ( iapt_referrals_latest
        # only work with records that have been discharged
        .filter(
            (F.col("ServDischDate").isNotNull()) &
            (F.col("UsePathway_Flag") == True) & 
            (F.col("EndCode").isin([46, 47, 48, 49, 96]))
        )

        # count activity by ethnicity
        .groupBy("EthnicCategory2021")
        .agg(F.countDistinct("PathwayID").alias("referrals_count"))
        .withColumn("referrals_rate", F.col("referrals_count") / total_referrals_filtered_count)
        .withColumn("referrals_perc", F.format_number("referrals_rate", 2))
        .orderBy(F.col("referrals_count").desc())
        .display()
    )

Wow, all rows are `null`. Don't use this field.

### RTT

In [0]:
if flag_data_check:
    # summarise referrals
    (
        iapt_referrals_latest
        # only work with records that have been discharged
        .filter(
            (F.col("ServDischDate").isNotNull()) &
            (F.col("UsePathway_Flag") == True) & 
            (F.col("EndCode").isin([46, 47, 48, 49, 96]))
        )

        # calculate days and weeks between therapy session first date and referral date (RTT)
        .withColumn(
            "calc_RTT_days",
            F.datediff(F.col("TherapySession_FirstDate"), F.col("ReferralRequestReceivedDate"))
        )
        .withColumn(
            "calc_RTT_weeks",
            (F.datediff(F.col("TherapySession_FirstDate"), F.col("ReferralRequestReceivedDate")) / 7).cast("int")
        )

        # count activity by RTT days
        .groupBy("calc_RTT_weeks")
        .agg(F.countDistinct("PathwayID").alias("referrals_count"))
        .withColumn("referrals_rate", F.col("referrals_count") / total_referrals_filtered_count)
        .withColumn("referrals_perc", F.format_number(F.col("referrals_rate") * 100, 2))
        .orderBy(F.col("referrals_count").desc())
        .display()
    )

- 28% 0 weeks
- 21% 1 week
- 14% 2 weeks
- 10% 3 weeks
- 7% `null`
- 6% 4 weeks
- 4% 5 weeks
- 2% 6 weeks
- 2% 7 weeks
- 1% 8 weeks
- 1% 9 weeks

... 208 rows returned.
93% begin treatment within 6 weeks of referral (very impressive).

### Step-up therapy intensity

In [0]:
# summarise referrals
(
    iapt_referrals_latest
    # only work with records that have been discharged
    .filter(
        (F.col("ServDischDate").isNotNull()) &
        (F.col("UsePathway_Flag") == True) & 
        (F.col("EndCode").isin([46, 47, 48, 49, 96]))
    )

    # flag records where there is a step-up from low-intensity to high-intensity therapy
    .withColumn(
        "calc_step_up_therapy_flag", 
        F.when(F.col("LowIntensityTherapy_FirstDate") < F.col("HighIntensityTherapy_FirstDate"), True).otherwise(False)
    )

    # count activity by RTT days
    .groupBy("calc_step_up_therapy_flag")
    .agg(F.countDistinct("PathwayID").alias("referrals_count"))
    .withColumn("referrals_rate", F.col("referrals_count") / total_referrals_filtered_count)
    .withColumn("referrals_perc", F.format_number(F.col("referrals_rate") * 100, 2))
    .orderBy(F.col("referrals_count").desc())
    .display()
)

- 75% False (no step-up intensity)
- 25% True (step-up intensity)

In [0]:
# summarise referrals
(
    iapt_referrals_latest
    # only work with records that have been discharged
    .filter(
        (F.col("ServDischDate").isNotNull()) &
        (F.col("UsePathway_Flag") == True) & 
        (F.col("EndCode").isin([46, 47, 48, 49, 96]))
    )

    # flag records where there is a step-up from low-intensity to high-intensity therapy
    .withColumn(
        "calc_step_up_therapy_flag", 
        F.when(F.col("LowIntensityTherapy_FirstDate") < F.col("HighIntensityTherapy_FirstDate"), True).otherwise(False)
    )
    .withColumn(
        "calc_both_dates_complete",
        F.when(F.col("LowIntensityTherapy_FirstDate").isNotNull() & F.col("HighIntensityTherapy_FirstDate").isNotNull(), True).otherwise(False)
    )

    # show some examples
    .select(F.col("calc_step_up_therapy_flag"), F.col("LowIntensityTherapy_FirstDate"), F.col("HighIntensityTherapy_FirstDate"), F.col("calc_both_dates_complete"))
    .limit(1000)
    .display()
)

Notes from reviewing this 1,000 records:
- where either LI or HI first date is `null` then the flag is `False` (expected behaviour),
- where both dates present and LI is earlier than HI then flag is `True` (expected behaviour)
- I found few examples where the HI was earlier than the LI date, the flag was `False` (expected behaviour)
- I found few examples where the LI and HI dates are the same, the flag was `False` (expected behaviour)

## Contacts

In [0]:
# get the total number of contacts (denominator)
total_contacts_count = (
    iapt_contacts_latest
    # limit to attended contacts (5) or attended late contacts (6)
    .filter(F.col("AttendOrDNACode").isin("5", "6"))
    .select(F.countDistinct("Unique_CareContactID")).collect()[0][0]
)

### Contacts per month
Looking to calculate some matching variables based on a profile of contacts.

In [0]:
# work out the year-month for contacts
iapt_contacts_latest = iapt_contacts_latest \
    .withColumn("calc_Contact_YM", F.date_format("CareContDate", "yyyy-MM"))

if flag_data_display:
    iapt_contacts_latest \
        .select("Unique_ServiceRequestID", "Unique_CareContactID", "CareContDate", "calc_Contact_YM") \
            .limit(10).display()

In [0]:
if flag_data_display:
    iapt_contacts_latest \
        .groupBy("calc_Contact_YM") \
        .agg(F.countDistinct("Unique_CareContactID").alias("contacts_count")) \
            .orderBy("calc_Contact_YM").display()

### Contact location

In [0]:
if flag_data_check:
    # summarise contacts by location
    ( iapt_contacts_latest
        # limit to attended contacts (5) or attended late contacts (6)
        .filter(F.col("AttendOrDNACode").isin("5", "6"))
        
        # count activity by location
        .groupBy("ActLocTypeCode")
        .agg(F.countDistinct("Unique_CareContactID").alias("contacts_count"))
        .withColumn("contacts_rate", F.col("contacts_count") / total_contacts_count)
        .withColumn("contacts_perc", F.format_number("contacts_rate", 2))
        .orderBy(F.col("contacts_count").desc())
        .display()
    )

This is interesting, there are many more contacts in a non-hospital setting than I was expecting: 
- 40% of attended contacts have a recorded location of `X01` (Other locations not elsewhere classified),
- 15% are `A01` (Patient's home), 
- 14% `null`, 
- 10% `A04` (Other patient related location),
- 9% `B01` (Primary care health centre),
- 3% `E01` (Out-Patient clinic),
- 2% `C01` (GP)

... very few seem to be in locations that would pose logistical issues in attending.

In [0]:
if flag_data_check:
    # lets simplify the locations to improve readability
    act_loc_type_mapping = {
        "Patient main residence": ["A01", "A02", "A03", "A04"],
        "Health Centre premises": ["B01", "B02"],
        "GP / Dentist / Opthalmic": ["C01", "C02", "C03"],
        "Walk in Centres": ["D01", "D02", "D03"],
        "Hospital premises": ["E01", "E02", "E03", "E04", "E99"],
        "Hospice": ["F01"],
        "Nursing / Residental home": ["G01", "G02", "G03", "G04"],
        "Day Centre premises": ["H01"],
        "Resource Centre premises": ["J01"],
        "Children and Family premises": ["K01", "K02"],
        "Educational premises": ["L01", "L02", "L03", "L04", "L05", "L06", "L99"],
        "Justice premises": ["M01", "M02", "M03", "M04", "M06", "M07", "M05"],
        "Public locations": ["N01", "N02", "N03", "N04", "N05"],
        "Other locations": ["X01"]
    }

    # convert the dictionary to a dataframe
    mapping_list = [(code, group) for group, codes in act_loc_type_mapping.items() for code in codes]
    mapping_df = spark.createDataFrame(mapping_list, ["ActLocTypeCode", "calc_LocationGroup"])

    # summarise contacts by location group
    ( iapt_contacts_latest
        # limit to attended contacts (5) or attended late contacts (6)
        .filter(F.col("AttendOrDNACode").isin("5", "6"))

        # add the location groups
        .join(mapping_df, on="ActLocTypeCode", how="left")
        
        # count activity by location group
        .groupBy("calc_LocationGroup")
        .agg(F.countDistinct("Unique_CareContactID").alias("contacts_count"))
        .withColumn("contacts_rate", F.col("contacts_count") / total_contacts_count)
        .withColumn("contacts_perc", F.format_number("contacts_rate", 2))
        .orderBy(F.col("contacts_count").desc())
        .display()
    )

OK, the list now looks like this:
- 40% Other locations
- 24% Patient main residence
- 14% `null`
- 9% Health centre
- 5% Hospital
- 4% Public locations
- 2% GP

It is not clear what the `Other` locations are, and why they aren't covered by the extensive list of coding. Query, is this data quality issue?

The `null` records are also prominent, being the third most frequently recorded option. Again, raises queries re: data quality.

Patient main residence seems quite high at 24% of contacts. Perhaps this is a result of telephone contacts?

### Contact method

In [0]:
if flag_data_check:
    # summarise contacts by method / mechanism
    ( iapt_contacts_latest
        # limit to attended contacts (5) or attended late contacts (6)
        .filter(F.col("AttendOrDNACode").isin("5", "6"))
        
        # count activity by method
        .groupBy("ConsMechanism")
        .agg(F.countDistinct("Unique_CareContactID").alias("contacts_count"))
        .withColumn("contacts_rate", F.col("contacts_count") / total_contacts_count)
        .withColumn("contacts_perc", F.format_number("contacts_rate", 2))
        .orderBy(F.col("contacts_count").desc())
        .display()
    )

A summary of key findings:
- 43% `02` (Telephone)
- 22% `11` (Video consultation)
- 16% `01` (Face to face)
- 14% `null`
- 2%  `98` (Other - not listed)
- 1%  `03` (Code no longer valid ? an old code)
- 1%  `05` (Email)

So, the majority of contacts are telephone based followed by video consultations (which helps to explain why so many contacts are in the person's usual residence, see above).
Around 16% of contacts are F2F.

### Contact timings
Look at contacts whether they're:
- weekday / weekend
- in / out 9-5pm
- intersection of the above

In [0]:
if flag_data_check:
    # summarise contacts weekday
    (iapt_contacts_latest
        # limit to attended contacts (5) or attended late contacts (6)
        .filter(F.col("AttendOrDNACode").isin("5", "6"))
        
        # count activity by weekday
        .withColumn("calc_weekday", F.weekday(F.col("CareContDate")))
        .groupBy("calc_weekday")
        .agg(F.countDistinct("Unique_CareContactID").alias("contacts_count"))
        .withColumn("contacts_rate", F.col("contacts_count") / total_contacts_count)
        .withColumn("contacts_perc", F.format_number("contacts_rate", 2))
        .orderBy(F.col("contacts_count").desc())
        .display()
    )

- `0` Monday (19%)
- `1` Tuesday (23%)
- `2` Wednesday (22%)
- `3` Thursday (21%)
- `4` Friday (15%)
- `5` Saturday (1%)
- `6` Sunday (0%)

The vast majority of contacts (99%) are conducted weekdays.

NB, using `weekday()` function as this is ISO 8601 compliant, rather than `dayofweek()`

In [0]:
if flag_data_check:
    # summarise contacts weekday
    ( iapt_contacts_latest
        # limit to attended contacts (5) or attended late contacts (6)
        .filter(F.col("AttendOrDNACode").isin("5", "6"))
        
        # count activity by weekday
        .withColumn("calc_hour", F.hour(F.col("CareContTime")))
        .groupBy("calc_hour")
        .agg(F.countDistinct("Unique_CareContactID").alias("contacts_count"))
        .withColumn("contacts_rate", F.col("contacts_count") / total_contacts_count)
        .withColumn("contacts_perc", F.format_number("contacts_rate", 2))
        .orderBy(F.col("contacts_count").desc())
        .display()
    )

- 0am 0% (but there are 83k records at this time ? DQ)
- 1am 0%
- 2am 0%
- 3am 0%
- 4am 0%
- 5am 0%
- 6am 0%
- 7am 0%
- 8am 3%
- 9am 13%
- 10am 14%
- 11am 12%
- 12pm 9%
- 1pm 12%
- 2pm 11%
- 3pm 9%
- 4pm 6%
- 5pm 5%
- 6pm 4%
- 7pm 1%
- 8pm 0%
- 9pm 0%
- 10pm 0%
- 11pm 0%
- `null` 1%

Most contacts take place within 'working hours' (9-5pm), with a number taking place early (8am) and later (6 and 7 pm).

In [0]:
if flag_data_check:
    # summarise contacts weekday
    ( iapt_contacts_latest
        # limit to attended contacts (5) or attended late contacts (6)
        .filter(F.col("AttendOrDNACode").isin("5", "6"))
        
        # count activity by in / out office-hours (9am-5pm, Mon-Fri)
        .withColumn(
            "calc_officehours", 
            F.when(F.weekday(F.col("CareContDate")).between(0, 4) & 
                F.hour(F.col("CareContTime")).between(9, 17), "Office hours").otherwise("Outside office hours")
        )
        .groupBy("calc_officehours")
        .agg(F.countDistinct("Unique_CareContactID").alias("contacts_count"))
        .withColumn("contacts_rate", F.col("contacts_count") / total_contacts_count)
        .withColumn("contacts_perc", F.format_number("contacts_rate", 2))
        .orderBy(F.col("contacts_count").desc())
        .display()
    )

- Office hours (89%)
- Outside office hours (11%)

### Interpreters

In [0]:
if flag_data_check:
    # summarise contacts by interpreter present code
    ( iapt_contacts_latest
        # limit to attended contacts (5) or attended late contacts (6)
        .filter(F.col("AttendOrDNACode").isin("5", "6"))
        
        # count activity by interpreter present
        .groupBy("InterpreterPresentInd")
        .agg(F.countDistinct("Unique_CareContactID").alias("contacts_count"))
        .withColumn("contacts_rate", F.col("contacts_count") / total_contacts_count)
        .withColumn("contacts_perc", F.format_number("contacts_rate", 2))
        .orderBy(F.col("contacts_count").desc())
        .display()
    )

- 70% `null`
- 28% `4` (No, Interpreter not required)
- 2% `X` (Not known, not recorded)
- 0% `1` (Yes, Professional interpreter)
- 0% `5` (No, Interpreter was required but did not attend)
- 0% `3` (Yes, Another person)
- 0% `2` (Yes, Family member or friend)

I suppose the 70% `null` response can be interpreted as interpreter not required? Seems this question could be considered superfluous if both therapist and patient spoke the same language.

In [0]:
if flag_data_check:
    # summarise contacts by language used code
    ( iapt_contacts_latest
        # limit to attended contacts (5) or attended late contacts (6)
        .filter(F.col("AttendOrDNACode").isin("5", "6"))
        
        # count activity by interpreter present
        .groupBy("LanguageCodeTreat")
        .agg(F.countDistinct("Unique_CareContactID").alias("contacts_count"))
        .withColumn("contacts_rate", F.col("contacts_count") / total_contacts_count)
        .withColumn("contacts_perc", F.format_number("contacts_rate", 2))
        .orderBy(F.col("contacts_count").desc())
        .display()
    )

- 72% `null`
- 27% `en`
- 0% `EN`
- 0% `pl`
- 0% `q4`
- 0% `ur`
- 0% `fa`
- 0% `ar`
...
There were 197 rows (languages returned).

Important to note that English is coded as `en` and `EN` (lower and upper case). Also note the high rate of `null` records - presumably English as the default language so not bothering to code, or maybe DQ?

NB, `q4` is British sign language

### Internet-enabled contacts

In [0]:
if flag_data_check:
    # summarise contacts by language used code
    ( iapt_contacts_latest
        # limit to attended contacts (5) or attended late contacts (6)
        .filter(F.col("AttendOrDNACode").isin("5", "6"))
        
        # count activity by interpreter present
        .groupBy("IntEnabledTherProg")
        .agg(F.countDistinct("Unique_CareContactID").alias("contacts_count"))
        .withColumn("contacts_rate", F.col("contacts_count") / total_contacts_count)
        .withColumn("contacts_perc", F.format_number("contacts_rate", 2))
        .orderBy(F.col("contacts_count").desc())
        .display()
    )

- 100% `null`
- 0% Silvercloud
- 0% Slvrcld Dep & Anx
- 0% Silvercloud ...
... there were 52 rows returned. It looks like the response is the name of the internet-enabled therapy programme.

Note, check for a non-null response in this field to determine if an IET used.

### Contact therapy mode


In [0]:
if flag_data_check:
    # summarise contacts by language used code
    ( iapt_contacts_latest
        # limit to attended contacts (5) or attended late contacts (6)
        .filter(F.col("AttendOrDNACode").isin("5", "6"))
        
        # count activity by interpreter present
        .groupBy("CareContPatientTherMode")
        .agg(F.countDistinct("Unique_CareContactID").alias("contacts_count"))
        .withColumn("contacts_rate", F.col("contacts_count") / total_contacts_count)
        .withColumn("contacts_perc", F.format_number("contacts_rate", 2))
        .orderBy(F.col("contacts_count").desc())
        .display()
    )

- 78% `1` Individual patient
- 16% `null`
- 6% `3` Group therapy
- 0% `2` Couple

Most contacts are with individual patients. The `null` responses pose an issue here: how do we treat these codes to produce the matching variables?

a) Count `1` - but this could give issues if the `null` is provider-specific, i.e. assumed to be the defacto default of `1`.

b) Count `3` and `2` together?

I think I prefer option b.

# Matching variables
We proposed the following measures as matching variables in the 'Matching Variables' .pptx file (July 2025):

Referrals:
- **Number of referrals dishcarged** each month (proxy for size of service).
- Proportion of referrals for people **aged 25 years and younger** at referral.
- Proportion of referrals for people **aged 60 years and older** at referral.
- Proportion of referrals for people whose **gender identity is female**.
- Proportion of referrals for people whose **LSOA of residence is among the 20% most deprived** in England.
- Proportion of referrals for people whose **LSOA of residence is among the 20% least deprived** in England.
- Proportion of referrals for people whose **broad ethnic background is `White`**.
- Proportion of referrals where the **referral-to-treatment- wait time is within six weeks**.
- Proportion of referrals where there was a **step-up to high-intensity** therapy.

Contacts:
- Proportion of care contacts where the **therapist has attained an NHS TT qualification**.
- Proportion of care contacts conducted **on hospital premises**.
- Proportion of care contacts conducted **face-to-face**.
- Proportion of care contacts conducted **outside of weekdays, 9am to 5pm**.
- Proportion of care contacts conducted **in English**.
- Proportion of care contacts conducted **with an interpreter present**.
- Proportion of care contacts delivered **as internet enabled therapy**.

#### Referrals

In [0]:
# remind myself what fields are and the data they hold
iapt_referrals_latest.limit(10).display()

In [0]:
# I'm preparing this list of matching variables in advance of sign-off by TT clients to save time
# NB, this section may need revisiting once final set of matching variables agreed
matching_referrals = (
    iapt_referrals_latest
        # only work with records that have been discharged
        .filter(F.col("ServDischDate").isNotNull())
        # coalesce gender identity and gender to get a single value - i.e. fill in gaps in gender identity with values from gender
        .withColumn("calc_gender_identity", F.coalesce(F.col("GenderIdentity"), F.col("Gender")))
        # flag records where there is a step-up from low-intensity to high-intensity therapy
        .withColumn("calc_step_up_therapy_flag", F.when(F.col("LowIntensityTherapy_FirstDate") < F.col("HighIntensityTherapy_FirstDate"), True).otherwise(False))
        # calculate matching variables
        .groupBy("calc_Referral_Discharged_YM", "ODS_Code", "Name").agg(

            # Total referrals discharged in the month
            F.countDistinct("PathwayID").alias("discharges_count"),

            # Outcome 1 - proportion of referrals where 6 or more completed appointments (1 assessment and 5 treatment)
            F.countDistinct(F.when(F.col("TreatmentCareContact_Count") >= 6, F.col("PathwayID"))).alias("o1_discharges_6_or_more_completed_appointments"),

            # Matching 1 - referrals discharged for people aged 25 years and younger on the date of referral
            F.countDistinct(F.when(F.col("Age_ReferralRequest_ReceivedDate") <= 25, F.col("PathwayID"))).alias("m1_discharges_aged_under_26_at_referral"),

            # Matching 2 - referrals discharged for people aged 60 years and older on the date of referral
            F.countDistinct(F.when(F.col("Age_ReferralRequest_ReceivedDate") >= 60, F.col("PathwayID"))).alias("m2_discharges_aged_60_plus_at_referral"),

            # Matching 3 - referrals discharged for people whose gender identity is female (2)
            F.countDistinct(F.when(F.col("calc_gender_identity") == "2", F.col("PathwayID"))).alias("m3_discharges_female"),

            # Matching 4 - referrals discharged for people whose LSOA of residence is in the 20% most deprived
            F.countDistinct(F.when(F.col("IndicesOfDeprivationDecile").isin(1,2), F.col("PathwayID"))).alias("m4_discharges_20pc_most_deprived"),

            # Matching 5 - referrals discharged for people whose LSOA of residence is in the 20% least deprived
            F.countDistinct(F.when(F.col("IndicesOfDeprivationDecile").isin(9,10), F.col("PathwayID"))).alias("m5_discharges_20pc_least_deprived"),

            # Matching 6 - referrals discharged for people whose ethnicity is 'White', i.e. codes A, B or C
            F.countDistinct(F.when(F.col("Validated_EthnicCategory").isin("A", "B", "C"), F.col("PathwayID"))).alias("m6_discharges_white_ethnicity"),

            # Matching 7 - referrals discharged where there was a step-up from low-intensity to high-intensity therapy
            F.countDistinct(F.when(F.col("calc_step_up_therapy_flag") == True, F.col("PathwayID"))).alias("m7_discharges_step_up_therapy")
        )
)

if flag_data_display:
    matching_referrals.display()

In [0]:
if flag_data_export:
    matching_referrals.write.mode("overwrite").parquet(eval_path + "matching_referrals.parquet")

#### Contacts

Contacts:
- Proportion of care contacts where the **therapist has attained an NHS TT qualification**.
- Proportion of care contacts conducted **on hospital premises**.
- Proportion of care contacts conducted **face-to-face**.
- Proportion of care contacts conducted **outside of weekdays, 9am to 5pm**.
- Proportion of care contacts conducted **in English**.
- Proportion of care contacts conducted **with an interpreter present**.
- Proportion of care contacts delivered **as internet enabled therapy**.
- Proportion of care contacts delivered **to an individual patient**.

In [0]:
if flag_data_display:
  # remind myself what fields are and the data they hold
  iapt_contacts_latest.limit(10).display()

In [0]:
matching_contacts = (
    iapt_contacts_latest
        
        # limit to attended contacts (5) or attended late contacts (6)
        .filter(F.col("AttendOrDNACode").isin("5", "6"))

        # calculate matching variables
        .groupBy("calc_Contact_YM", "ODS_Code", "Name").agg(

            # denominator
            F.countDistinct("Unique_CareContactID").alias("contacts_count"),

            # Matching 1 - contacts where the therapist attained an NHS TT qualification
            #F.countDistinct(F.when(F.col("TherapistAttainedNHS_TT_Qualification") == True, F.col("Unique_CareContactID"))).alias("m1_contacts_therapist_attained_nhs_tt_qualification")

            # Matching 2 - contacts conducted on hospital premises
            F.countDistinct(F.when(
                F.col("ActLocTypeCode").isin("E01", "E02", "E03", "E04", "E99"), 
                F.col("Unique_CareContactID")
                )
            ).alias("m2_contacts_hospital_premises"),

            # Matching 3 - contacts conducted face-to-face
            F.countDistinct(F.when(
                F.col("ConsMechanism").isin("01"),
                F.col("Unique_CareContactID")
                )
            ).alias("m3_contacts_face_to_face"),

            # Matching 4 - contacts conducted weekdays, 9am-5pm
            F.countDistinct(F.when(
                F.weekday(F.col("CareContDate")).between(0, 4) & 
                F.hour(F.col("CareContTime")).between(9, 17),
                F.col("Unique_CareContactID")
                )
            ).alias("m4_contacts_outside_weekdays_9am_5pm"),

            # Matching 5 - contacts conducted in English,
            F.countDistinct(F.when(
                F.col("LanguageCodeTreat").isin(["eng"]),
                F.col("Unique_CareContactID")
                )
            ).alias("m5_contacts_conducted_in_english"),

            # Matching 6 - contacts with interpreter present
            F.countDistinct(F.when(
                F.col("InterpreterPresentInd").isin([1, 2, 3]),
                F.col("Unique_CareContactID")
                )
            ).alias("m6_contacts_interpreter_present")
        )
)

In [0]:
if flag_data_export:
    matching_referrals.write.mode("overwrite").parquet(eval_path + "matching_contacts.parquet")

In [0]:
# NB, this is an early attempt at working with data, so may not be the best approach.
from pyspark.sql.functions import col, datediff, mean

# Filter for usepathway_flag = 'True'
filtered_df = df.filter(col("UsePathway_Flag") == 'True')

# Calculate the mean of TherapySEssionFirstDate - ReferralRequestReceivedDate
mean_diff = filtered_df.select(mean(datediff(col("TherapySEssionFirstDate"), col("ReferralRequestReceivedDate"))).alias("mean_diff"))

display(mean_diff)