In [None]:
"""
************ SETUP - DON'T TOUCH **************
This section imports data from the configuration database
and should not need to be altered, molested or otherwise messed with. 
~~These are not the droids you are looking for~~
"""
from core.constants import BRANCH_NAME, ENV_BUCKET
from core.helpers.session_helper import SessionHelper
from core.models.configuration import Transformation
from dataclasses import dataclass
from core.dataset_contract import DatasetContract

session = SessionHelper().session
db_transform = session.query(Transformation).filter(Transformation.id == transform_id).one()

@dataclass
class DbTransform:
    id: int = db_transform.id ## the instance id of the transform in the config app
    name: str = db_transform.transformation_template.name ## the transform name in the config app
    state: str = db_transform.pipeline_state.pipeline_state_type.name ## the pipeline state, one of raw, ingest, master, enhance, enrich, metrics, dimensional
    branch:str = BRANCH_NAME ## the git branch for this execution 
    brand: str = db_transform.pipeline_state.pipeline.brand.name ## the pharma brand name
    pharmaceutical_company: str = db_transform.pipeline_state.pipeline.brand.pharmaceutical_company.name # the pharma company name
    publish_contract: DatasetContract = DatasetContract(branch=BRANCH_NAME,
                            parent=db_transform.pipeline_state.pipeline.brand.pharmaceutical_company.name,
                            child=db_transform.pipeline_state.pipeline.brand.name,
                            state=db_transform.pipeline_state.pipeline_state_type.name,
                            dataset=db_transform.transformation_template.name)


# CORE Cartridge Notebook :: symphony_health_association_refinement
![CORE Logo](assets/coreLogo.png) 

---
## Keep in Mind
Good Transforms Are...
- **singular in purpose:** good transforms do one and only one thing, and handle all known cases for that thing. 
- **repeatable:** transforms should be written in a way that they can be run against the same dataset an infinate number of times and get the same result every time. 
- **easy to read:** 99 times out of 100, readable, clear code that runs a little slower is more valuable than a mess that runs quickly. 
- **No 'magic numbers':** if a variable or function is not instantly obvious as to what it is or does, without context, maybe consider renaming it.

## Workflow - how to use this notebook to make science
#### Data Science
1. **Document your transform.** Fill out the _description_ cell below describing what it is this transform does; this will appear in the configuration application where Ops will create, configure and update pipelines. 
1. **Define your config object.** Fill out the _configuration_ cell below the commented-out guide to define the variables you want ops to set in the configuration application (these will populate here for every pipeline). 
2. **Build your transformation logic.** Use the transformation cell to do that magic that you do. 
![caution](assets/cautionTape.png)

### Configuration

In [None]:
""" 
********* CONFIGURATION - PLEASE TOUCH ********* 
This section defines what you expect to get from the configuration application 
in a single "transform" object. Define the vars you need here, and comment inline to the right of them 
for all-in-one documentation. 
Engineering will build a production "transform" object for every pipeline that matches what you define here.

@@@ FORMAT OF THE DATA CLASS IS: @@@ 

<value_name>: <data_type> #<comment explaining what the value is to future us>

~~These ARE the droids you are looking for~~
"""

class Transform(DbTransform):
        ## YOUR properties go here!!
        ingest_source_transform: str = db_transform.variables.ingest_source_transform # The name of the dataset to pull from
        ingest_source_file_prefix: str = db_transform.variables.ingest_source_file_prefix # If from initial ingest, the file prefix name
        transaction_date: str = db_transform.variables.transaction_date
        pharmacy_code: str = db_transform.variables.pharmacy_code
        pharmacy_npi: str = db_transform.variables.pharmacy_npi
        pharmacy_hin: str = db_transform.variables.pharmacy_hin
        pharmacy_name: str = db_transform.variables.pharmacy_name
        pharmacy_ncpdp: str = db_transform.variables.pharmacy_ncpdp
        pharmacy_address_1: str = db_transform.variables.pharmacy_address_1
        pharmacy_address_2: str = db_transform.variables.pharmacy_address_2
        pharmacy_city: str = db_transform.variables.pharmacy_city
        pharmacy_state: str = db_transform.variables.pharmacy_state
        pharmacy_zip: str = db_transform.variables.pharmacy_zip
        transaction_type: str = db_transform.variables.transaction_type
        pharmacy_transaction_id: str = db_transform.variables.pharmacy_transaction_id
        transaction_sequence: str = db_transform.variables.transaction_sequence
        referral_source: str = db_transform.variables.referral_source
        referral_date: str = db_transform.variables.referral_date
        longitudinal_patient_id: str = db_transform.variables.longitudinal_patient_id
        pharmacy_patient_id: str = db_transform.variables.pharmacy_patient_id
        patient_dob: str = db_transform.variables.patient_dob
        hub_patient_id: str = db_transform.variables.hub_patient_id
        bridge_patient: str = db_transform.variables.bridge_patient
        hub_patient: str = db_transform.variables.hub_patient
        patient_gender: str = db_transform.variables.patient_gender
        dx_1: str = db_transform.variables.dx_1
        dx_2: str = db_transform.variables.dx_2
        status_date: str = db_transform.variables.status_date
        status: str = db_transform.variables.status
        substatus: str = db_transform.variables.substatus
        customer_status: str = db_transform.variables.customer_status
        customer_substatus: str = db_transform.variables.customer_substatus
        customer_status_description: str = db_transform.variables.customer_status_description
        hcp_last_name: str = db_transform.variables.hcp_last_name
        hcp_first_name: str = db_transform.variables.hcp_first_name
        hcp_address_1: str = db_transform.variables.hcp_address_1
        hcp_address_2: str = db_transform.variables.hcp_address_2
        hcp_city: str = db_transform.variables.hcp_city
        hcp_state: str = db_transform.variables.hcp_state
        hcp_zip: str = db_transform.variables.hcp_zip
        hcp_phone: str = db_transform.variables.hcp_phone
        hcp_specialty: str = db_transform.variables.hcp_specialty
        hcp_npi: str = db_transform.variables.hcp_npi
        hcp_dea_number: str = db_transform.variables.hcp_dea_number
        hcp_facility: str = db_transform.variables.hcp_facility
        rx_date: str = db_transform.variables.rx_date
        rx_number: str = db_transform.variables.rx_number
        rx_fills: str = db_transform.variables.rx_fills
        rx_fill_number: str = db_transform.variables.rx_fill_number
        rx_refills_remaining: str = db_transform.variables.rx_refills_remaining
        prev_dispensed: str = db_transform.variables.prev_dispensed
        ndc: str = db_transform.variables.ndc
        brand_column: str = db_transform.variables.brand
        medication: str = db_transform.variables.medication
        quantity_dispensed: str = db_transform.variables.quantity_dispensed
        uom_dispensed: str = db_transform.variables.uom_dispensed
        days_supply: str = db_transform.variables.days_supply
        ship_date: str = db_transform.variables.ship_date
        ship_carrier: str = db_transform.variables.ship_carrier
        ship_tracking_id: str = db_transform.variables.ship_tracking_id
        ship_location: str = db_transform.variables.ship_location
        ship_address_1: str = db_transform.variables.ship_address_1
        ship_address_2: str = db_transform.variables.ship_address_2
        ship_city: str = db_transform.variables.ship_city
        ship_state: str = db_transform.variables.ship_state
        ship_zip: str = db_transform.variables.ship_zip
        has_medical_coverage_flag: str = db_transform.variables.has_medical_coverage_flag
        primary_coverage_type: str = db_transform.variables.primary_coverage_type
        primary_payer: str = db_transform.variables.primary_payer
        primary_payer_type: str = db_transform.variables.primary_payer_type
        primary_payer_subtype: str = db_transform.variables.primary_payer_subtype
        primary_payer_group: str = db_transform.variables.primary_payer_group
        primary_payer_bin: str = db_transform.variables.primary_payer_bin
        primary_payer_iin: str = db_transform.variables.primary_payer_iin
        primary_payer_pcn: str = db_transform.variables.primary_payer_pcn
        primary_plan: str = db_transform.variables.primary_plan
        primary_plan_type: str = db_transform.variables.primary_plan_type
        secondary_coverage_type: str = db_transform.variables.secondary_coverage_type
        secondary_payer: str = db_transform.variables.secondary_payer
        secondary_payer_type: str = db_transform.variables.secondary_payer_type
        secondary_payer_subtype: str = db_transform.variables.secondary_payer_subtype
        secondary_payer_group: str = db_transform.variables.secondary_payer_group
        secondary_payer_bin: str = db_transform.variables.secondary_payer_bin
        secondary_payer_iin: str = db_transform.variables.secondary_payer_iin
        secondary_payer_pcn: str = db_transform.variables.secondary_payer_pcn
        secondary_plan: str = db_transform.variables.secondary_plan
        secondary_plan_type: str = db_transform.variables.secondary_plan_type
        primary_plan_paid: str = db_transform.variables.primary_plan_paid
        secondary_plan_paid: str = db_transform.variables.secondary_plan_paid
        primary_copay: str = db_transform.variables.primary_copay
        primary_coins: str = db_transform.variables.primary_coins
        primary_deductible: str = db_transform.variables.primary_deductible
        primary_patient_responsibility: str = db_transform.variables.primary_patient_responsibility
        secondary_copay: str = db_transform.variables.secondary_copay
        secondary_coins: str = db_transform.variables.secondary_coins
        secondary_deductible: str = db_transform.variables.secondary_deductible
        secondary_patient_responsibility: str = db_transform.variables.secondary_patient_responsibility
        copay_as_amount: str = db_transform.variables.copay_as_amount
        other_payer_amount: str = db_transform.variables.other_payer_amount
        transfer_pharmacy: str = db_transform.variables.transfer_pharmacy
        bridge_quantity_dispensed: str = db_transform.variables.bridge_quantity_dispensed
        prior_therapy_name: str = db_transform.variables.prior_therapy_name
        pharmacy_parent_name: str = db_transform.variables.pharmacy_parent_name
        hcp_state_license_number: str = db_transform.variables.hcp_state_license_number
        patient_state: str = db_transform.variables.patient_state
        dose_exchange_flag: str = db_transform.variables.dose_exchange_flag
        dose_exchange_count: str = db_transform.variables.dose_exchange_count
        dose_titration_quantity: str = db_transform.variables.dose_titration_quantity
        dose_titration_count: str = db_transform.variables.dose_titration_count
        oxygen_flag: str = db_transform.variables.oxygen_flag
        patient_zip: str = db_transform.variables.patient_zip
        secondary_payer_flag: str = db_transform.variables.secondary_payer_flag
        restatement_flag: str = db_transform.variables.restatement_flag
        aggregator_transaction_id: str = db_transform.variables.aggregator_transaction_id
        primary_pbm_name: str = db_transform.variables.primary_pbm_name
        referral_number: str = db_transform.variables.referral_number
        hcp_middle_name: str = db_transform.variables.hcp_middle_name
        hcp_suffix: str = db_transform.variables.hcp_suffix
        pharmacy_dea_number: str = db_transform.variables.pharmacy_dea_number
        primary_prior_auth_required_flag: str = db_transform.variables.primary_prior_auth_required_flag
        primary_prior_auth_expiration_date: str = db_transform.variables.primary_prior_auth_expiration_date
        patient_consent_date: str = db_transform.variables.patient_consent_date
        primary_cost_type: str = db_transform.variables.primary_cost_type
        primary_cost_amount: str = db_transform.variables.primary_cost_amount
        patient_support_1: str = db_transform.variables.patient_support_1
        patient_support_2: str = db_transform.variables.patient_support_2
        patient_oop_program_name: str = db_transform.variables.patient_oop_program_name
        bridge_quantity_dispensed_2: str = db_transform.variables.bridge_quantity_dispensed_2
        enroll_received_date: str = db_transform.variables.enroll_received_date
        fitness_for_duty_request_flag: str = db_transform.variables.fitness_for_duty_request_flag
        fitness_for_duty_ship_date: str = db_transform.variables.fitness_for_duty_ship_date
        triage_date: str = db_transform.variables.triage_date
        dose_count: str = db_transform.variables.dose_count

In [None]:
from core.logging import get_logger

In [None]:
transform = Transform()
logger = get_logger(f"core.transforms.{transform.state}.{transform.name}")

In [None]:
# The data model is hardcoded as a constant that will require a PR to edit for security
column_renames = {
    'transaction_date' : transform.transaction_date,
    'pharmacy_code' : transform.pharmacy_code,
    'pharmacy_npi' : transform.pharmacy_npi,
    'pharmacy_hin' : transform.pharmacy_hin,
    'pharmacy_name' : transform.pharmacy_name,
    'pharmacy_ncpdp' : transform.pharmacy_ncpdp,
    'pharmacy_address_1' : transform.pharmacy_address_1,
    'pharmacy_address_2' : transform.pharmacy_address_2,
    'pharmacy_city' : transform.pharmacy_city,
    'pharmacy_state' : transform.pharmacy_state,
    'pharmacy_zip' : transform.pharmacy_zip,
    'transaction_type' : transform.transaction_type,
    'pharmacy_transaction_id' : transform.pharmacy_transaction_id,
    'transaction_sequence' : transform.transaction_sequence,
    'referral_source' : transform.referral_source,
    'referral_date' : transform.referral_date,
    'longitudinal_patient_id' : transform.longitudinal_patient_id,
    'pharmacy_patient_id' : transform.pharmacy_patient_id,
    'patient_dob' : transform.patient_dob,
    'hub_patient_id' : transform.hub_patient_id,
    'bridge_patient' : transform.bridge_patient,
    'hub_patient' : transform.hub_patient,
    'patient_gender' : transform.patient_gender,
    'dx_1' : transform.dx_1,
    'dx_2' : transform.dx_2,
    'status_date' : transform.status_date,
    'status' : transform.status,
    'substatus' : transform.substatus,
    'customer_status' : transform.customer_status,
    'customer_substatus' : transform.customer_substatus,
    'customer_status_description' : transform.customer_status_description,
    'hcp_last_name' : transform.hcp_last_name,
    'hcp_first_name' : transform.hcp_first_name,
    'hcp_address_1' : transform.hcp_address_1,
    'hcp_address_2' : transform.hcp_address_2,
    'hcp_city' : transform.hcp_city,
    'hcp_state' : transform.hcp_state,
    'hcp_zip' : transform.hcp_zip,
    'hcp_phone' : transform.hcp_phone,
    'hcp_specialty' : transform.hcp_specialty,
    'hcp_npi' : transform.hcp_npi,
    'hcp_dea_number' : transform.hcp_dea_number,
    'hcp_facility' : transform.hcp_facility,
    'rx_date' : transform.rx_date,
    'rx_number' : transform.rx_number,
    'rx_fills' : transform.rx_fills,
    'rx_fill_number' : transform.rx_fill_number,
    'rx_refills_remaining' : transform.rx_refills_remaining,
    'prev_dispensed' : transform.prev_dispensed,
    'ndc' : transform.ndc,
    'brand' : transform.brand_column,
    'medication' : transform.medication,
    'quantity_dispensed' : transform.quantity_dispensed,
    'uom_dispensed' : transform.uom_dispensed,
    'days_supply' : transform.days_supply,
    'ship_date' : transform.ship_date,
    'ship_carrier' : transform.ship_carrier,
    'ship_tracking_id' : transform.ship_tracking_id,
    'ship_location' : transform.ship_location,
    'ship_address_1' : transform.ship_address_1,
    'ship_address_2' : transform.ship_address_2,
    'ship_city' : transform.ship_city,
    'ship_state' : transform.ship_state,
    'ship_zip' : transform.ship_zip,
    'has_medical_coverage_flag' : transform.has_medical_coverage_flag,
    'primary_coverage_type' : transform.primary_coverage_type,
    'primary_payer' : transform.primary_payer,
    'primary_payer_type' : transform.primary_payer_type,
    'primary_payer_subtype' : transform.primary_payer_subtype,
    'primary_payer_group' : transform.primary_payer_group,
    'primary_payer_bin' : transform.primary_payer_bin,
    'primary_payer_iin' : transform.primary_payer_iin,
    'primary_payer_pcn' : transform.primary_payer_pcn,
    'primary_plan' : transform.primary_plan,
    'primary_plan_type' : transform.primary_plan_type,
    'secondary_coverage_type' : transform.secondary_coverage_type,
    'secondary_payer' : transform.secondary_payer,
    'secondary_payer_type' : transform.secondary_payer_type,
    'secondary_payer_subtype' : transform.secondary_payer_subtype,
    'secondary_payer_group' : transform.secondary_payer_group,
    'secondary_payer_bin' : transform.secondary_payer_bin,
    'secondary_payer_iin' : transform.secondary_payer_iin,
    'secondary_payer_pcn' : transform.secondary_payer_pcn,
    'secondary_plan' : transform.secondary_plan,
    'secondary_plan_type' : transform.secondary_plan_type,
    'primary_plan_paid' : transform.primary_plan_paid,
    'secondary_plan_paid' : transform.secondary_plan_paid,
    'primary_copay' : transform.primary_copay,
    'primary_coins' : transform.primary_coins,
    'primary_deductible' : transform.primary_deductible,
    'primary_patient_responsibility' : transform.primary_patient_responsibility,
    'secondary_copay' : transform.secondary_copay,
    'secondary_coins' : transform.secondary_coins,
    'secondary_deductible' : transform.secondary_deductible,
    'secondary_patient_responsibility' : transform.secondary_patient_responsibility,
    'copay_as_amount' : transform.copay_as_amount,
    'other_payer_amount' : transform.other_payer_amount,
    'transfer_pharmacy' : transform.transfer_pharmacy,
    'bridge_quantity_dispensed' : transform.bridge_quantity_dispensed,
    'prior_therapy_name' : transform.prior_therapy_name,
    'pharmacy_parent_name' : transform.pharmacy_parent_name,
    'hcp_state_license_number' : transform.hcp_state_license_number,
    'patient_state' : transform.patient_state,
    'dose_exchange_flag' : transform.dose_exchange_flag,
    'dose_exchange_count' : transform.dose_exchange_count,
    'dose_titration_quantity' : transform.dose_titration_quantity,
    'dose_titration_count' : transform.dose_titration_count,
    'oxygen_flag' : transform.oxygen_flag,
    'patient_zip' : transform.patient_zip,
    'secondary_payer_flag' : transform.secondary_payer_flag,
    'restatement_flag' : transform.restatement_flag,
    'aggregator_transaction_id' : transform.aggregator_transaction_id,
    'primary_pbm_name' : transform.primary_pbm_name,
    'referral_number' : transform.referral_number,
    'hcp_middle_name' : transform.hcp_middle_name,
    'hcp_suffix' : transform.hcp_suffix,
    'pharmacy_dea_number' : transform.pharmacy_dea_number,
    'primary_prior_auth_required_flag' : transform.primary_prior_auth_required_flag,
    'primary_prior_auth_expiration_date' : transform.primary_prior_auth_expiration_date,
    'patient_consent_date' : transform.patient_consent_date,
    'primary_cost_type' : transform.primary_cost_type,
    'primary_cost_amount' : transform.primary_cost_amount,
    'patient_support_1' : transform.patient_support_1,
    'patient_support_2' : transform.patient_support_2,
    'patient_oop_program_name' : transform.patient_oop_program_name,
    'bridge_quantity_dispensed_2' : transform.bridge_quantity_dispensed_2,
    'enroll_received_date' : transform.enroll_received_date,
    'fitness_for_duty_request_flag' : transform.fitness_for_duty_request_flag,
    'fitness_for_duty_ship_date' : transform.fitness_for_duty_ship_date,
    'triage_date' : transform.triage_date,
    'dose_count' : transform.dose_count
}

# Likewise, the list of required columns is kept as well
required_columns = [
    "pharmacy_npi",
    "transaction_sequence",
    "pharmacy_patient_id",
    "status_date",
    "hcp_last_name",
    "hcp_first_name",
    "hcp_address_1",
    "hcp_city",
    "hcp_state",
    "hcp_npi",
    "ndc",
    "primary_coverage_type",
    "primary_payer_type",
    "patient_state"
]

### Description
What does this transformation do? be specific.

![what does your transform do](assets/what.gif)

This transform takes patient status data and maps it to our internal schema. It rejects files missing the defined required columns, cuts any extra columns they gave us, and adds in the missing columns with all NaN's.

### Transformation

In [None]:
import pandas as pd
import numpy as np
import boto3
from s3parq import fetch

In [None]:
# Get dataframes only by file name
# This has to be done through string manipulation on files under the prefix
# TODO: Find a more elegant solution
#    If problematique files get ingested without file name barrier it would never fetch properly

input_contract = DatasetContract(parent=transform.publish_contract.parent, 
                                child=transform.publish_contract.child,
                                state="ingest",
                                dataset=transform.ingest_source_transform
                                )

ingest_prefix=input_contract.key+"/"+transform.ingest_source_file_prefix
bucket = transform.publish_contract.env

# Need : set of file names
#    Fetching will be iterating over them, fetching with key-less S3 paths

# Get names of all files under key
file_names = set()
files_with_prefix = []
s3_client = boto3.client('s3')
paginator = s3_client.get_paginator('list_objects')
operation_parameters = {'Bucket': bucket,
                        'Prefix': ingest_prefix}
page_iterator = paginator.paginate(**operation_parameters)
for page in page_iterator:
    if not "Contents" in page.keys():
        break

    for item in page['Contents']:
        if item['Key'].endswith('.parquet'):
            files_with_prefix.append(item['Key'])
            
# Remove key
key_len = len(ingest_prefix)

def subtract_key(file):
    # +1 due to the extra slash at the end
    return file[(key_len + 1):]

files_without_prefix = [subtract_key(file) for file in files_with_prefix]

for file_path in files_without_prefix:
    # Split up so other parts can be easily deleted
    unparsed_parts = file_path.split("/")
    
    # Remove parquet names at the end
    del unparsed_parts[-1]

    # Remove all partition columns
    for part in unparsed_parts:
        if "=" not in part:
            file_names.add(part)

In [None]:
# Custom exception class, this makes exception handling specific
# Given the try except, other exceptions could catch valid errors occuring
class MissingRequiredColumnError(Exception):
    pass

In [None]:
def check_required_columns(df: pd.DataFrame)->None:
    logger.debug(f"Expecting the following columns : {list(column_renames.values())}")
    logger.debug(f"Dataframe has these columns : {list(df.columns)}")

    # Gets the pre-mapping required column names
    required_columns_preschema = [value for key,value in column_renames.items() if key in required_columns]
    
    logger.debug(f"The following columns are required : {required_columns_preschema}")
    
    missing_required_columns = set(required_columns_preschema) - set(df.columns)

    if missing_required_columns:
        logger.error("Data does not have all required columns.")
        raise MissingRequiredColumnError(f"Missing required columns : {missing_required_columns}")

In [None]:
final_dataframe=pd.DataFrame()
run_filter = [{"partition": "__metadata_run_id", "comparison": "==", "values": [run_id]}]

for file_name in file_names:
    logger.debug(f"Retrieving data from path : {ingest_prefix}")
    logger.debug(f"Ingesting data under file name : {file_name} , with run_id : {run_id}")
    
    # Run with parallel as False since its much slower if the data is not large
    file_df = fetch(bucket=bucket, key=(ingest_prefix+"/"+file_name), filters=run_filter, parallel=False)
    
    logger.debug(f"File data fetched, fetched dataframe shape : {file_df.shape}")
    
    # Check base requirement fullfillment
    try:
        check_required_columns(file_df)
    except MissingRequiredColumnError:
        # TODO: this needs to send a notification! That is occuring in a separate story however
        logger.info(f"File :   {file_name}   : is missing required columns and is being skipped.")
        continue
        
    logger.debug("File meets requirements.")
    
    # First cut down to the necesarry columns in case of accidental extras - Pandas wont catch those
    extra_columns = set(file_df.columns) - set(column_renames.values())
    file_df = file_df.drop(axis=1,labels=list(extra_columns))

    # Rename based on above created configuration variables
    # Reverse dictionary made since theres a clash in order needs
    column_renames_pandas_style = {value: key for key, value in column_renames.items()}
    file_df = file_df.rename(column_renames_pandas_style, axis="columns")
    
    # Add missing columns to match schema and fill with NaN
    missing_columns = set(column_renames.keys()) - set(file_df.columns)
    for column in missing_columns:
        file_df[column] = ""
    
    logger.debug("File successfully appended.")
    final_dataframe = final_dataframe.append(file_df)

### Publish

In [None]:
## that's it - just provide the final dataframe to the var final_dataframe and we take it from there
transform.publish_contract.publish(final_dataframe, run_id, session, False)
session.close()