# RAP Example Python Pipeline - Interactive Exercise

ADD THE GOOGLE COLAB LINK HERE

## Intro

This notebook will show you how straight-forward it is to do an analytical pipeline in Python.

The core of any of piece of analytical work is to:
- load some data
- do something to do that, e.g. process it, do some analysis
- create some output

This notebook will go briefly through each of these showing *one* way of doing it in Python (there are many more!). 

Open this notebook in google colab and have a play - try changing bits and see what happens!

**NOTE**: to make the workshop more straight forward, we haven't completely followed good practice. If you want to see a pipeline how it should be, well laid out and modularised, [see our Example Python pipeline](https://github.com/NHSDigital/RAP_example_pipeline_python).

## Setup

We will need to install a few things before we can get going.

First, if this is running in Google Colab, we need to clone the repo and install the right python packages.

In [2]:
# this forces google collab to install the dependencies
if "google.colab" in str(get_ipython()):
    print("Running on Colab")
    !git clone https://github.com/NHSDigital/RAP_example_pipeline_python.git -q
    %cd RAP_example_pipeline_python
    !pip install -r requirements.txt -q -q

Next we need to import the right libraries for this piece of work:

In [16]:
import logging # this allows us to write log messages helping with any future auditing and debugging
import timeit # this allows us to time the execution of the code
from datetime import datetime # this allows us to work with dates and times
import pandas as pd # this allows us to work with dataframes

# these are the modules we have created to help us with the pipeline
from src.utils import file_paths
from src.utils import logging_config
from src.utils import spark as spark_utils
import src.data_ingestion
from src.processing import aggregate_counts
from src.data_exports import write_csv



## Config

It's important that we don't hardcode things which can change into the code - instead we keep things like that in config files.

An example is where the data is to be picked up from and where any outputs will be saved to: these will change from when you are working in "dev" to when the code is finalised and put into "production".

In [17]:
config = file_paths.get_config() 

In [18]:
# initialise and configure logging
logger = logging.getLogger(__name__)
logging_config.configure_logging(config['log_dir'])
logger.info(f"Configured logging with log folder: {config['log_dir']}.")
logger.info(f"Logging the config settings:\n\n\t{config}\n")
logger.info(f"Starting run at:\t{datetime.now().time()}")

2024-06-10 11:42:34,860 - INFO -- 1487816006.py:                <module>():4 -- Configured logging with log folder: .
2024-06-10 11:42:34,861 - INFO -- 1487816006.py:                <module>():5 -- Logging the config settings:

	{'project_name': 'example_pipeline_pyspark_version', 'data_url': 'https://files.digital.nhs.uk/assets/Services/Artificial%20data/Artificial%20HES%20final/artificial_hes_ae_202302_v1_sample.zip', 'path_to_downloaded_data': 'data_in/artificial_hes_ae_202302_v1_sample.zip/artificial_hes_ae_202302_v1_sample/artificial_hes_ae_2122.csv', 'output_dir': '', 'log_dir': ''}

2024-06-10 11:42:34,862 - INFO -- 1487816006.py:                <module>():6 -- Starting run at:	11:42:34.862941


## Load Data

First we will load the data: we're going to use an artificial fake version of the NHS Hospital Episode Statistics Accident and Emergency (HES AE) data from 2003. 

We've hidden all the complexity of aquiring the data away in a function - called "get_data". This is good practice, because:

1. this data might be used many times in many different pipelines - this function can be reused, saving your colleagues time
2. the way the data is acquired might change, e.g. in different platforms, to accomodate this we only need to add to, change or improve this function - your downstream pipeline should continue as normal

This function:
- gets the location of the data from the config file
- downloads the CSV
- loads that CSV into a pandas dataframe in memory

This is just an example - in another setting we could make it load the data from a SQL server, or from a database, S3 bucket, etc.

In [19]:
def get_data(config):
    """Get the data from the data source and return it as a pandas dataframe
    
    Args:
        config (dict): the configuration dictionary

    Returns:
        pandas dataframe: the data    
    """

    # get the data location from the config
    data_location = config['data_url']
    print("the data came from here: ", data_location) # let's print the location so you can see where it is stored - it's a publicly available zip.

    # download the CSV file
    src.data_ingestion.get_data.download_zip_from_url(data_location, overwrite=True)
    logger.info(f"Downloaded data as zip.")

    # read the CSV file into a pandas dataframe
    df_data = pd.read_csv(config['path_to_downloaded_data'])

    return df_data

Now we will use our get data function to... get the data! Look how simple it makes the code below to read - it does what it says on the tin

In [20]:
df_hes_data = get_data(config)

the data came from here:  https://files.digital.nhs.uk/assets/Services/Artificial%20data/Artificial%20HES%20final/artificial_hes_ae_202302_v1_sample.zip
2024-06-10 11:42:38,509 - INFO -- 734666532.py:                get_data():10 -- Downloaded data as zip.


Let's see what this data looks like, and pull the first 5 rows:

In [23]:
df_hes_data.head(5)

Unnamed: 0,FYEAR,PARTYEAR,PSEUDO_HESID,AEKEY,AEKEY_FLAG,AEARRIVALMODE,AEATTEND_EXC_PLANNED,AEATTENDCAT,AEATTENDDISP,AEDEPTTYPE,...,LSOA11,MSOA11,PROVDIST,PROVDIST_FLAG,NER_GP_PRACTICE,NER_RESIDENCE,NER_TREATMENT,SITETRET,SITEDIST,SITEDIST_FLAG
0,2122,202103,TESTqPNh7HEHdm1sB5QlvVaSQZS7BekK,910587081231,1,2,1,1,3,1,...,E01000385,E02001768,19.37,3.0,QKS,QKS,QHM,RW601,4.89,5.0
1,2122,202103,TESTqPNh7HEHdm1sB5QlvVaSQZS7BekK,747777461989,1,2,1,1,3,1,...,E01030571,E02004833,3.96,3.0,QMJ,QYG,QKS,RY901,1.21,5.0
2,2122,202103,TESTqPNh7HEHdm1sB5QlvVaSQZS7BekK,244053969711,1,2,1,1,3,3,...,E01008938,E02005828,,3.0,QWE,QKK,QWO,RJC02,15.16,5.0
3,2122,202103,TESTqPNh7HEHdm1sB5QlvVaSQZS7BekK,425257514835,1,2,1,1,1,1,...,E01030533,E02000912,23.68,3.0,QMJ,QRV,QOP,RJE07,3.16,5.0
4,2122,202103,TESTqPNh7HEHdm1sB5QlvVaSQZS7BekK,892001219292,1,2,1,1,3,1,...,E01025434,E02004315,5.85,3.0,QMF,QM7,QMJ,RDE03,2.41,5.0


## Processing

Now the fun part - we get to do some interesting processing on the data.

The simplest piece of processing you might do is simply get a distinct count on one of the columns. 

Again, we create a function to do this - for a very small bit of processing like this it might not make a lot of sense, but if you were doing a larger derivation that might feasibly be used in other work, it could really save someone else some time!

In [24]:
def get_distinct_count(df: pd.DataFrame, col_to_aggregate: str) -> int:
    """Returns the number of distinct values in a column of a pandas DataFrame.
    
    Args:
        df (pd.DataFrame): the pandas DataFrame
        col_to_aggregate (str): the column to aggregate

    Returns:
        int: the number of distinct values
    """
    return df[col_to_aggregate].nunique()

Let's run our simple analysis and print the result:

In [25]:
distinct_epikey_count = get_distinct_count(df_hes_data, 'EPIKEY')
print(f"Distinct EPIKEY count: {distinct_epikey_count}")

Distinct EPIKEY count: 10000


In [14]:
# Creating dictionary to hold outputs
outputs = {}

# Count number of episodes in England - place this in the outputs dictionary
outputs["df_hes_england_count"] = get_distinct_count(df_hes_data, 'EPIKEY')

# Rename and save spark dataframes as CSVs:
for output_name, output in outputs.items():

    import pandas as pd

    # Create a DataFrame with the integer value
    df_output = pd.DataFrame({'england_count': [outputs["df_hes_england_count"]]})

    # prep the filepath and ensure the directory exists
    from pathlib import Path
    output_file = 'my_file.csv'
    output_dir = Path(f'data_out/{output_name}')
    output_dir.mkdir(parents=True, exist_ok=True)
    output_filename = output_dir /f'{output_name}.csv'

    # Save the DataFrame to a CSV file
    df_output.to_csv(output_filename, index=False)
    logger.info(f"saved output df to {output_filename}")

2024-05-28 14:18:00,084 - INFO -- 1024386828.py:                <module>():24 -- saved output df to data_out/df_hes_england_count/df_hes_england_count.csv
