# Installation of Requirements

The cluster running this notebook will require the following installed packages to function.  We kept it minimal because of the issues with pickling things into dataframe operations for pyspark - so any libraries that you add to simplify this will need to be vetted that they actually do their pickling in a way that's not stupid and will let it be parallelized via spark well.

* pyspark
* pandas

# Imports

Keeping all the imports in one place so they're easy to work through or edit as needed.  These can probably be cleaned up some now that things are functional, but I haven't gotten to that yet.

In [0]:
import hashlib
import hmac
import copy
import boto3
import pandas as pd
from codecs import encode, decode
from random import randrange, shuffle
from collections import defaultdict
from functools import reduce
from decimal import Decimal
from pyspark.sql.functions import col, udf, max
from pyspark.sql.types import StringType, DateType, FloatType, NullType, DecimalType
import pyspark.sql.functions as fn
from pyspark.sql.functions import dense_rank
from pyspark.sql import SparkSession
from pyspark.sql import Window
import random
from decimal import Decimal



# Secrets and Constants

The secret key should actually be randomly generated, but for testing purposes, having it set static will mean that you can consistently compare between runs.  That may or may not be desired, depending on how we want to approach this data or the quality behind it.  Please just know that the secret key is only set statically right now for testing, and should be random to prevent the decryption of any fields that rely on the secret key for salting as it's encrypted.

In [0]:
# These should be randomly generated for a run, but should be consistent across all files in the run
SECRET_KEY = "EF4359D8D580AA4F7F036D6F04FC6A94"
RUN_NUMBER = 0  # This will increment with each run to keep the testing files separate on the same day.
COMPANY_NAME = "golden_company"

# Location to upload files
BASE_TARGET_DIRECTORY = F"/mnt/artemis-client-etl-data/zeus_files/{COMPANY_NAME}/"

# Used for naming files
RUN_DATE = str(pd.Timestamp.now().date())

# Just a constant loaded form S3 files.
CITY_DF = spark.read.parquet("s3a://artemis-client-etl-data/zeus_files/references/current/zip_codes/")

# Function Definitions

Add any new translations that need to be used here - for example, if you have a new column type that you want to run row conversions on, add the function here that you'll call, and then update the run logic section to include that converter.  Then you'd add any conversions in the conversions section so that they'll be picked up.

In [0]:
def _get_zip_shuffle():
    state_zips = defaultdict(set)
    for state in (row.state for row in CITY_DF.select('state').distinct().collect()):
        state_df = CITY_DF[CITY_DF['state'] == state]
        for row in state_df.collect():
            for zipcode in str(row.text_zip).split():
                state_zips[state].add(zipcode)
    zipmap = {}
    for state, zips in state_zips.items():
        before = list(zips)
        after = list(zips)
        shuffle(after)
        zipmap[state] = dict(zip(before, after))
        
    shuffled_zipmap = reduce(lambda a, b: {**a, **b}, zipmap.values())
    return shuffled_zipmap

def _get_cities_by_zip():
  state_cities = defaultdict(set)
  for state in (row.state for row in CITY_DF.select('state').distinct().collect()):
      state_df = CITY_DF[CITY_DF['state'] == state]
      for row in state_df.collect():
          for city in str(row.primary_city).split():
              state_cities[str(state).lower()].add(str(row['primary_city']).lower())

  citymap = {}
  for state, city in state_cities.items():
    before = list(city)
    after = list(city)
    shuffle(after)
    citymap[state] = dict(zip(before, after))
        
  shuffled_citymap = reduce(lambda a, b: {**a, **b}, citymap.values())
  return shuffled_citymap
  
# input_string: str
def _get_business_orgs_map(business_org_df):
  business_df = business_org_df.collect()
  busines_org_shuffle = {}
  for row in business_df:
      busines_org_shuffle[row["employee_department_id"]] = row["ouput_employee_department"]
  return busines_org_shuffle

def remap_hris_business_orgs(df, column_name):
  windowSpec  = Window.orderBy(column_name)
  df_filtered = df.filter(f'{column_name} is not null')
  df_ranked = df_filtered.withColumn('output_' + column_name, dense_rank().over(windowSpec))
  final_df = df_ranked.drop(column_name).withColumnRenamed('output_' + column_name, column_name)
  return final_df


def string_scramble(input_string: str) -> str:
    """Take a string and encrypt it witht he string_cipher."""
    if not input_string:
      result = ''
    else:
      signature = hmac.new(
        SECRET_KEY.encode(),
        msg=input_string.encode(),
        digestmod=hashlib.sha256
      ).hexdigest()
      result = signature[:len(input_string)]
    return result


def member_id_scramble(input_string: str) -> str:
    """Take a string and encrypt it witht he string_cipher."""
    
    # Scramble the 32 digit id portion of the member consistently
    og_id_string = input_string[-32:]
    
    # Tack on the company_id
    result = hashlib.md5(og_id_string.encode()).hexdigest()
    
    new_member_id = "golden_dataset-" + result
    return str(new_member_id)
  

def date_shift(shift_days: int, input_date_string: str, random_shift_delta: int = 5):
    """Shift a date by the shift days +- the random shift delta."""
    if not input_date_string:
      return ''

    try:
      date_to_shift = pd.Timestamp(input_date_string).date()
      shifted_date = date_to_shift + pd.Timedelta(days=shift_days)

      rand_shift_amount = randrange(-random_shift_delta, random_shift_delta)
      random_shifted_date = shifted_date + pd.Timedelta(days=rand_shift_amount)
      result = str(random_shifted_date)
    except pd.errors.OutOfBoundsDatetime:
      result = ''

    return result


def pass_null(input_data):
    return ''
  

def remap_employer(input_data):
    return "GOLDEN EMPLOYER"


def passthrough(input_data):
    return input_data


def financial_shift(input_amount: FloatType) -> DecimalType:
    """Shift a financial float number by +- the shift percentage."""
    
    if not input_amount or input_amount == 0.00:
      result = round(0.00, 2)

    else:
      input_amount = float(input_amount)

      prct_shift = 0.37
      shiftfloat = float(prct_shift)

      # Shift the input amount by a random percentage above or below the actual.
      result = input_amount + round((input_amount * shiftfloat),2)
      
    return result


def ssn_scramble(input_ssn):
    """Return an encrypted SSN"""
    # One way numerical cipher, match the digit length in output
    if not input_ssn:
      result = ''
    else:      
      signature = hmac.new(
        SECRET_KEY.encode(),
        msg=input_ssn.encode(),
        digestmod=hashlib.sha256
      ).hexdigest()
      numbers = str(int(signature, 16))
      result = numbers[:len(input_ssn)]
    return result


def claim_id_scramble(input_claim_number):
    """Return an encrypted claim number"""
    # One way numerical cipher, match the digit length in output
    if not input_claim_number:
      result = ''
    else:      
      signature = hmac.new(
        SECRET_KEY.encode(),
        msg=input_claim_number.encode(),
        digestmod=hashlib.sha256
      ).hexdigest()
      numbers = str(int(signature, 16))
      result = numbers[:len(input_claim_number)]
    return result


def remap_zip(input_zip):
    if input_zip is None:
      return None
    newzip = shuffled_zipmap.get(input_zip)
    return newzip
  
  
def remap_city(input_city):
  if input_city is None:
    return None
  city = cities_by_zip.get(input_city.lower())
  return city


def remap_business_orgs(input_biz_org):
  if input_biz_org is None:
    return None
  business_org_value = business_org_map.get(input_biz_org)
  return business_org_value

# City/Zip Randomization

These are randomized each run to keep PII/PHI from being an issue with zip codes or cities of individual members.

In [0]:
shuffled_zipmap = _get_zip_shuffle()
cities_by_zip = _get_cities_by_zip()

# Conversions

If you need to add more conversions this is a convenience markdown organization so you can drop the list of translations in and then add them in the filename <-> converters section.  This is a bit clunky, but until we agree on a more adequate format, this "works" and is easy to iterate on without much knowlege of python or spark/etc.

## med conversions

In [0]:
med_translations = [
  ('employer_name', 'string', 'remap_employer'),
  ('paid_date', 'date', 'date_shift'),
  ('admit_date', 'date', 'date_shift'),
  ('discharge_date', 'date', 'date_shift'),
  ('from_date', 'date', 'date_shift'),
  ('member_birth_date', 'date', 'date_shift'),
  ('pass_through_03', 'date', 'pass_null'),
  ('to_date', 'date', 'date_shift'),
  ('allowed_amount', 'decimal_10_2', 'financial_shift'),
  ('billed_amount', 'decimal_10_2', 'financial_shift'),
  ('cob_amount', 'decimal_10_2', 'financial_shift'),
  ('coinsurance_amount', 'decimal_10_2', 'financial_shift'),
  ('copay_amount', 'decimal_10_2', 'financial_shift'),
  ('deductible_amount', 'decimal_10_2', 'financial_shift'),
  ('discount_amount', 'decimal_10_2', 'financial_shift'),
  ('eligible_amount', 'decimal_10_2', 'financial_shift'),
  ('hra_amount', 'decimal_10_2', 'financial_shift'),
  ('member_paid_amount', 'decimal_10_2', 'financial_shift'),
  ('net_adjustment', 'decimal_10_2', 'financial_shift'),
  ('not_covered_amount', 'decimal_10_2', 'financial_shift'),
  ('paid_amount', 'decimal_10_2', 'financial_shift'),
  ('units', 'float', 'passthrough'),
  ('admit_type', 'int', 'passthrough'),
  ('capitated_encounter_indicator', 'int', 'passthrough'),
  ('claim_line_index', 'int', 'passthrough'),
  ('covered_days', 'int', 'passthrough'),
  ('drg', 'int', 'passthrough'),
  ('mdc', 'int', 'passthrough'),
  ('member_gender', 'int', 'passthrough'),
  ('member_relationship', 'int', 'passthrough'),
  ('member_status', 'int', 'passthrough'),
  ('network_status_indicator', 'int', 'passthrough'),
  ('pass_through_02', 'int', 'pass_null'),
  ('premium_provider_indicator', 'int', 'passthrough'),
  ('adjustment_type', 'string', 'passthrough'),
  ('bill_type', 'string', 'passthrough'),
  ('billing_provider_address_line_1', 'string', 'pass_null'),
  ('billing_provider_address_line_2', 'string', 'pass_null'),
  ('billing_provider_name', 'string', 'pass_null'),
  ('billing_provider_npi', 'string', 'passthrough'),
  ('billing_provider_phone', 'string', 'pass_null'),
  ('billing_provider_specialty_code', 'string', 'pass_null'),
  ('billing_provider_state', 'string', 'passthrough'),
  ('billing_provider_tax_id', 'string', 'pass_null'),
  ('billing_provider_type', 'string', 'pass_null'),
  ('claim_line_status', 'string', 'passthrough'),
  ('claim_type', 'string', 'passthrough'),
  ('data_source', 'string', 'passthrough'),
  ('discharge_status', 'string', 'passthrough'),
  ('drg_type', 'string', 'passthrough'),
  ('icd_dx_poa_1', 'string', 'passthrough'),
  ('icd_dx_poa_2', 'string', 'passthrough'),
  ('icd_dx_poa_3', 'string', 'passthrough'),
  ('icd_dx_poa_4', 'string', 'passthrough'),
  ('icd_dx_poa_5', 'string', 'passthrough'),
  ('icd_dx_poa_6', 'string', 'passthrough'),
  ('icd10_dx_1', 'string', 'passthrough'),
  ('icd10_dx_2', 'string', 'passthrough'),
  ('icd10_dx_3', 'string', 'passthrough'),
  ('icd10_dx_4', 'string', 'passthrough'),
  ('icd10_dx_5', 'string', 'passthrough'),
  ('icd10_dx_6', 'string', 'passthrough'),
  ('icd10_proc_1', 'string', 'passthrough'),
  ('icd10_proc_2', 'string', 'passthrough'),
  ('icd10_proc_3', 'string', 'passthrough'),
  ('icd10_proc_4', 'string', 'passthrough'),
  ('icd10_proc_5', 'string', 'passthrough'),
  ('icd10_proc_6', 'string', 'passthrough'),
  ('icd9_dx_1', 'string', 'passthrough'),
  ('icd9_dx_2', 'string', 'passthrough'),
  ('icd9_dx_3', 'string', 'passthrough'),
  ('icd9_dx_4', 'string', 'passthrough'),
  ('icd9_dx_5', 'string', 'passthrough'),
  ('icd9_dx_6', 'string', 'passthrough'),
  ('icd9_proc_1', 'string', 'passthrough'),
  ('icd9_proc_2', 'string', 'passthrough'),
  ('icd9_proc_3', 'string', 'passthrough'),
  ('icd9_proc_4', 'string', 'passthrough'),
  ('icd9_proc_5', 'string', 'passthrough'),
  ('icd9_proc_6', 'string', 'passthrough'),
  ('member_first_name', 'string', 'string_scramble'),
  ('member_home_state', 'string', 'passthrough'),
  ('member_last_name', 'string', 'string_scramble'),
  ('member_middle_name', 'string', 'pass_null'),
  ('member_ssn', 'string', 'ssn_scramble'),
  ('nucc_taxonomy_specialty', 'string', 'passthrough'),
  ('pass_through_01', 'string', 'pass_null'),
  ('place_of_service_code', 'string', 'passthrough'),
  ('procedure_code', 'string', 'passthrough'),
  ('procedure_code_modifier', 'string', 'passthrough'),
  ('revenue_code', 'string', 'passthrough'),
  ('servicing_provider_address_line_1', 'string', 'pass_null'),
  ('servicing_provider_address_line_2', 'string', 'pass_null'),
  ('servicing_provider_name', 'string', 'pass_null'),
  ('servicing_provider_npi', 'string', 'passthrough'),
  ('servicing_provider_specialty_code', 'string', 'passthrough'),
  ('servicing_provider_state', 'string', 'passthrough'),
  ('servicing_provider_tax_id', 'string', 'pass_null'),
  ('servicing_provider_type', 'string', 'pass_null'),
  ('source_claim_id', 'string', 'claim_id_scramble'),
  ('source_member_id', 'string', 'pass_null'),
  ('subscriber_employee_id', 'string', 'pass_null'),
  ('subscriber_ssn', 'string', 'ssn_scramble'),
  ('type_of_service_code', 'string', 'passthrough'),
  ('unique_line_id', 'string', 'passthrough'),
  ('billing_provider_city', 'string', 'remap_city'),
  ('billing_provider_zip_code', 'string', 'remap_zip'),
  ('member_home_city', 'string', 'remap_city'),
  ('member_home_zip_code', 'string', 'remap_zip'),
  ('servicing_provider_city', 'string', 'remap_city'),
  ('servicing_provider_zip_code', 'string', 'remap_zip'),
]

med_zip_city = [
  ('servicing_provider_zip_code', 'servicing_provider_city'),
  ('member_home_zip_code', 'member_home_city'),
  ('billing_provider_zip_code', 'billing_provider_city'),
]

## rx conversions

In [0]:
rx_translations = [
  ("paid_date", "date", "date_shift"),
  ("employer_name", "string", "remap_employer"),
  ("fill_date", "date", "date_shift"),
  ("member_birth_date", "date", "date_shift"),
  ("allowed_amount", "decimal_10_2", "financial_shift"),
  ("billed_amount", "decimal_10_2", "financial_shift"),
  ("cob_amount", "decimal_10_2", "financial_shift"),
  ("coinsurance_amount", "decimal_10_2", "financial_shift"),
  ("copay_amount", "decimal_10_2", "financial_shift"),
  ("deductible_amount", "decimal_10_2", "financial_shift"),
  ("dispensing_fee", "decimal_10_2", "financial_shift"),
  ("eligible_amount", "decimal_10_2", "financial_shift"),
  ("ingredient_cost", "decimal_10_2", "financial_shift"),
  ("ingredient_cost_per_unit", "decimal_10_2", "financial_shift"),
  ("member_paid_amount", "decimal_10_2", "financial_shift"),
  ("paid_amount", "decimal_10_2", "financial_shift"),
  ("sales_tax_amount", "decimal_10_2", "financial_shift"),
  ("quantity_dispensed", "float", "passthrough"),
  ("daw_code", "int", "passthrough"),
  ("days_supply", "int", "passthrough"),
  ("mail_order_indicator", "int", "passthrough"),
  ("maintenance_drug_indicator", "int", "passthrough"),
  ("member_gender", "int", "passthrough"),
  ("member_relationship", "int", "passthrough"),
  ("member_status", "int", "passthrough"),
  ("new_prescription", "int", "passthrough"),
  ("pharmacy_dispenser_type", "int", "passthrough"),
  ("rx_over_the_counter_indicator", "int", "passthrough"),
  ("rx_tier", "int", "passthrough"),
  ("source_generic_indicator", "int", "passthrough"),
  ("specialty_rx_indicator", "int", "passthrough"),
  ("claim_line_status", "string", "passthrough"),
  ("data_source", "string", "passthrough"),
  ("denied_reason", "string", "passthrough"),
  ("dosage_form_code", "string", "passthrough"),
  ("drug_type", "string", "passthrough"),
  ("formulary_status", "string", "passthrough"),
  ("generic_brand_from_pbm", "string", "passthrough"),
  ("generic_brand_in_house", "string", "passthrough"),
  ("member_first_name", "string", "string_scramble"),
  ("member_home_state", "string", "passthrough"),
  ("member_last_name", "string", "string_scramble"),
  ("member_middle_name", "string", "pass_null"),
  ("member_ssn", "string", "ssn_scramble"),
  ("ndc", "string", "passthrough"),
  ("pharmacy_address_line_1", "string", "pass_null"),
  ("pharmacy_address_line_2", "string", "pass_null"),
  ("pharmacy_description", "string", "pass_null"),
  ("pharmacy_group", "string", "pass_null"),
  ("pharmacy_name", "string", "pass_null"),
  ("pharmacy_npi", "string", "passthrough"),
  ("pharmacy_phone_number", "string", "pass_null"),
  ("pharmacy_state", "string", "passthrough"),
  ("prescriber_address_line_1", "string", "pass_null"),
  ("prescriber_address_line_2", "string", "pass_null"),
  ("prescriber_first_name", "string", "pass_null"),
  ("prescriber_id", "string", "pass_null"),
  ("prescriber_last_name", "string", "pass_null"),
  ("prescriber_phone_number", "string", "pass_null"),
  ("prescriber_provider_name", "string", "pass_null"),
  ("prescriber_specialty_code", "string", "pass_null"),
  ("prescriber_state", "string", "passthrough"),
  ("prescriber_tax_id", "string", "pass_null"),
  ("source_claim_id", "string", "claim_id_scramble"),
  ("source_member_id", "string", "pass_null"),
  ("subscriber_ssn", "string", "ssn_scramble"),
  ("unique_line_id", "string", "pass_null"),
  ("unit_of_measure", "string", "passthrough"),
  ("upc_code", "string", "passthrough"),
  ("member_home_zip_code", "string", "remap_zip"),
  ("member_home_city", "string", "remap_city"),
  ("pharmacy_zip_code", "string", "remap_zip"),
  ("pharmacy_city", "string", "remap_city"),
  ("prescriber_zip_code", "string", "remap_zip"),
  ("prescriber_city", "string", "remap_city"),
]


rx_zip_city = [
  ('member_home_zip_code', 'member_home_city'),
  ('pharmacy_zip_code', 'pharmacy_city'),
  ('prescriber_zip_code', 'prescriber_city'),
]

## eligibility conversions

In [0]:
# These conversions are manually assigned in an excel spreadsheet by Chris or someone else - for POC we've just got this hard coded to show that it will work if we have the relationships defined.
eligibility_translations = [
  ("month_key", "date", "date_shift"),
  ("employer_name", "string", "remap_employer"),
  ("artemis_member_id", "string", "member_id_scramble"),
  ("dcra_end_date", "date", "date_shift"),
  ("dcra_start_date", "date", "date_shift"),
  ("dental_end_date", "date", "date_shift"),
  ("dental_start_date", "date", "date_shift"),
  ("fsa_end_date", "date", "date_shift"),
  ("fsa_start_date", "date", "date_shift"),
  ("hra_end_date", "date", "date_shift"),
  ("hra_start_date", "date", "date_shift"),
  ("hsa_end_date", "date", "date_shift"),
  ("hsa_start_date", "date", "date_shift"),
  ("long_disab_end_date", "date", "date_shift"),
  ("long_disab_start_date", "date", "date_shift"),
  ("medical_end_date", "date", "date_shift"),
  ("medical_start_date", "date", "date_shift"),
  ("member_birth_date", "date", "date_shift"),
  ("pass_through_03", "date", "date_shift"),
  ("record_import_date", "date", "date_shift"),
  ("rx_end_date", "date", "date_shift"),
  ("rx_start_date", "date", "date_shift"),
  ("short_disab_end_date", "date", "date_shift"),
  ("short_disab_start_date", "date", "date_shift"),
  ("subscriber_birth_date", "date", "date_shift"),
  ("vision_end_date", "date", "date_shift"),
  ("vision_start_date", "date", "date_shift"),
  ("dcra_admin_fees", "decimal_base_ten_round_2", "financial_shift"),
  ("dcra_payroll_contribution_employer", "decimal_base_ten_round_2", "financial_shift"),
  ("dcra_payroll_contribution_member", "decimal_base_ten_round_2", "financial_shift"),
  ("dental_admin_fees", "decimal_base_ten_round_2", "financial_shift"),
  ("dental_payroll_contribution_employer", "decimal_base_ten_round_2", "financial_shift"),
  ("dental_payroll_contribution_member", "decimal_base_ten_round_2", "financial_shift"),
  ("fsa_admin_fees", "decimal_base_ten_round_2", "financial_shift"),
  ("fsa_payroll_contribution_employer", "decimal_base_ten_round_2", "financial_shift"),
  ("fsa_payroll_contribution_member", "decimal_base_ten_round_2", "financial_shift"),
  ("hra_admin_fees", "decimal_base_ten_round_2", "financial_shift"),
  ("hra_payroll_contribution_employer", "decimal_base_ten_round_2", "financial_shift"),
  ("hra_payroll_contribution_member", "decimal_base_ten_round_2", "financial_shift"),
  ("hsa_admin_fees", "decimal_base_ten_round_2", "financial_shift"),
  ("hsa_election_amount", "decimal_base_ten_round_2", "financial_shift"),
  ("hsa_payroll_contribution_employer", "decimal_base_ten_round_2", "financial_shift"),
  ("hsa_payroll_contribution_member", "decimal_base_ten_round_2", "financial_shift"),
  ("life_abb_amount", "decimal_base_ten_round_2", "financial_shift"),
  ("long_disab_abb_amount", "decimal_base_ten_round_2", "financial_shift"),
  ("long_disab_admin_fees", "decimal_base_ten_round_2", "financial_shift"),
  ("long_disab_payroll_contribution_employer", "decimal_base_ten_round_2", "financial_shift"),
  ("long_disab_payroll_contribution_member", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_admin_fees", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_member_coinsurance_max", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_member_deductible", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_member_oop_max", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_payroll_contribution_employer", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_payroll_contribution_member", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_plan_coinsurance", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_plan_copay_advanced_imaging", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_plan_copay_behavioral_health_visit", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_plan_copay_chiropractor_visit", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_plan_copay_clinic_visit", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_plan_copay_emergency_room", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_plan_copay_individual_deductible", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_plan_copay_inpatient_hospital", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_plan_copay_outpatient_surgery", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_plan_copay_primary_care_visit", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_plan_copay_rehabilitation_therapy", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_plan_copay_routine_exam", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_plan_copay_specialist_office_visit", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_plan_copay_urgent_care_facility", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_plan_family_deductible", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_plan_family_oop_max", "decimal_base_ten_round_2", "financial_shift"),
  ("medical_plan_individual_oop_max", "decimal_base_ten_round_2", "financial_shift"),
  ("rx_admin_fees", "decimal_base_ten_round_2", "financial_shift"),
  ("rx_payroll_contribution_employer", "decimal_base_ten_round_2", "financial_shift"),
  ("rx_payroll_contribution_member", "decimal_base_ten_round_2", "financial_shift"),
  ("short_disab_admin_fees", "decimal_base_ten_round_2", "financial_shift"),
  ("short_disab_payroll_contribution_employer", "decimal_base_ten_round_2", "financial_shift"),
  ("short_disab_payroll_contribution_member", "decimal_base_ten_round_2", "financial_shift"),
  ("vision_admin_fees", "decimal_base_ten_round_2", "financial_shift"),
  ("vision_payroll_contribution_employer", "decimal_base_ten_round_2", "financial_shift"),
  ("vision_payroll_contribution_member", "decimal_base_ten_round_2", "financial_shift"),
  ("dcra_coverage_tier_code", "int", "passthrough"),
  ("dcra_enrollment_status", "int", "passthrough"),
  ("dental_coverage_tier_code", "int", "passthrough"),
  ("dental_enrollment_status", "int", "passthrough"),
  ("eligibility_marked_smoking", "int", "passthrough"),
  ("fsa_coverage_tier_code", "int", "passthrough"),
  ("fsa_enrollment_status", "int", "passthrough"),
  ("hra_coverage_tier_code", "int", "passthrough"),
  ("hra_enrollment_status", "int", "passthrough"),
  ("hsa_coverage_tier_code", "int", "passthrough"),
  ("hsa_enrollment_status", "int", "passthrough"),
  ("long_disab_coverage_tier_code", "int", "passthrough"),
  ("long_disab_enrollment_status", "int", "passthrough"),
  ("medical_coverage_tier_code", "int", "passthrough"),
  ("medical_enrollment_status", "int", "passthrough"),
  ("medical_plan_counts_for_eligibility", "int", "passthrough"),
  ("medical_plan_metal_level", "int", "passthrough"),
  ("medical_plan_network_status_indicator", "int", "passthrough"),
  ("member_gender", "int", "passthrough"),
  ("member_relationship", "int", "passthrough"),
  ("member_status", "int", "passthrough"),
  ("pass_through_02", "int", "passthrough"),
  ("rx_coverage_tier_code", "int", "passthrough"),
  ("rx_enrollment_status", "int", "passthrough"),
  ("short_disab_coverage_tier_code", "int", "passthrough"),
  ("short_disab_enrollment_status", "int", "passthrough"),
  ("subscriber_gender", "int", "passthrough"),
  ("tobacco_surcharge", "int", "passthrough"),
  ("tobacco_surcharge_spouse", "int", "passthrough"),
  ("vision_coverage_tier_code", "int", "passthrough"),
  ("vision_enrollment_status", "int", "passthrough"),
  ("401k_plan_name", "string", "remap_plan"),
  ("alternate_id", "string", "pass_null"),
  ("benefit_deduction_group", "string", "passthrough"),
  ("data_source", "string", "passthrough"),
  ("medical_carrier_name", "string", "passthrough"),
  ("medical_plan_category_name", "string", "passthrough"),
  ("medical_plan_description", "string", "passthrough"),
  ("medical_plan_name", "string", "passthrough"),
  ("medical_plan_type", "string", "passthrough"),
  ("member_email", "string", "pass_null"),
  ("member_first_name", "string", "string_scramble"),
  ("member_home_address_line_1", "string", "pass_null"),
  ("member_home_address_line_2", "string", "pass_null"),
  ("member_home_phone", "string", "pass_null"),
  ("member_home_state", "string", "passthrough"),
  ("member_last_name", "string", "string_scramble"),
  ("member_middle_name", "string", "pass_null"),
  ("member_ssn", "string", "ssn_scramble"),
  ("pass_through_01", "string", "pass_null"),
  ("primary_care_provider_first_name", "string", "pass_null"),
  ("primary_care_provider_last_name", "string", "pass_null"),
  ("primary_care_provider_npi", "string", "pass_null"),
  ("primary_care_provider_source_system_id", "string", "pass_null"),
  ("primary_care_provider_tin", "string", "pass_null"),
  ("rx_carrier_name", "string", "passthrough"),
  ("rx_plan_name", "string", "remap_plan"),
  ("source_401k_plan_id", "string", "pass_null"),
  ("source_carrier", "string", "passthrough"),
  ("source_dcra_plan_id", "string", "pass_null"),
  ("source_dental_plan_id", "string", "pass_null"),
  ("source_fsa_plan_id", "string", "pass_null"),
  ("source_hra_plan_id", "string", "pass_null"),
  ("source_hsa_plan_id", "string", "pass_null"),
  ("source_long_disab_plan_id", "string", "pass_null"),
  ("source_medical_plan_id", "string", "pass_null"),
  ("source_member_id", "string", "pass_null"),
  ("source_rx_plan_id", "string", "pass_null"),
  ("source_short_disab_plan_id", "string", "pass_null"),
  ("source_subscriber_id", "string", "pass_null"),
  ("source_vision_plan_id", "string", "pass_null"),
  ("subscriber_address_line_1", "string", "pass_null"),
  ("subscriber_address_line_2", "string", "pass_null"),
  ("subscriber_email", "string", "pass_null"),
  ("subscriber_employee_id", "string", "pass_null"),
  ("subscriber_first_name", "string", "string_scramble"),
  ("subscriber_id", "string", "pass_null"),
  ("subscriber_last_name", "string", "string_scramble"),
  ("subscriber_middle_name", "string", "pass_null"),
  ("subscriber_phone", "string", "pass_null"),
  ("subscriber_ssn", "string", "ssn_scramble"),
  ("subscriber_state", "string", "passthrough"),
  ("unique_line_id", "string", "pass_null"),
  ("member_home_zip_code", "string", "remap_zip"),
  ("member_home_city", "string", "remap_city"),
  ("subscriber_zip_code", "string", "remap_zip"),
  ("subscriber_city", "string", "remap_city"),
]

eligibility_zip_city = [
  ('member_home_zip_code', 'member_home_city'),
  ('subscriber_zip_code', 'subscriber_city'),
]

## hris conversions

In [0]:
hris_translations = [
  ("month_key", "date", "date_shift"),
  ("employer_name", "string", "remap_employer"),
  ("employee_hire_date", "date", "date_shift"),
  ("employee_last_hire_date", "date", "date_shift"),
  ("employee_seniority_date", "date", "date_shift"),
  ("employee_termination_date", "date", "date_shift"),
  ("member_birth_date", "date", "date_shift"),
  ("record_import_date", "date", "date_shift"),
  ("subscriber_birth_date", "date", "date_shift"),
  ("employee_salary_annual", "decimal_10_2", "financial_shift"),
  ("employee_wage_hourly", "decimal_10_2", "financial_shift"),
  ("medical_budget_rate", "decimal_10_2", "financial_shift"),
  ("employee_flsa_status", "int", "passthrough"),
  ("employee_is_union", "int", "passthrough"),
  ("employee_type", "int", "passthrough"),
  ("employee_wage_type", "int", "passthrough"),
  ("employee_years_of_service", "int", "passthrough"),
  ("marital_status", "int", "passthrough"),
  ("member_ethnicity", "int", "passthrough"),
  ("member_gender", "int", "passthrough"),
  ("member_relationship", "int", "passthrough"),
  ("member_status", "int", "passthrough"),
  ("subscriber_gender", "int", "passthrough"),
  ("alternate_id", "string", "pass_null"),
  ("building_section", "string", "passthrough"),
  ("data_source", "string", "passthrough"),
  ("employee_compensation_grade", "string", "passthrough"),
  ("employee_cost_center", "string", "passthrough"),
  ("employee_department", "string", "passthrough"),
  ("employee_department_id", "string", "passthrough"),
  ("employee_department_l2", "string", "passthrough"),
  ("employee_department_l3", "string", "passthrough"),
  ("employee_department_l4", "string", "passthrough"),
  ("employee_department_l5", "string", "passthrough"),
  ("employee_department_l6", "string", "passthrough"),
  ("employee_job_level", "string", "passthrough"),
  ("employee_job_title", "string", "passthrough"),
  ("employee_management_level", "string", "passthrough"),
  ("employee_manager_eeid", "string", "passthrough"),
  ("employee_territory", "string", "passthrough"),
  ("employee_wage_tier", "string", "passthrough"),
  ("member_birth_year", "string", "pass_null"),
  ("member_email", "string", "pass_null"),
  ("member_first_name", "string", "string_scramble"),
  ("member_home_address_line_1", "string", "pass_null"),
  ("member_home_address_line_2", "string", "pass_null"),
  ("member_home_phone", "string", "pass_null"),
  ("member_home_state", "string", "passthrough"),
  ("member_last_name", "string", "string_scramble"),
  ("member_middle_name", "string", "pass_null"),
  ("member_nationality", "string", "passthrough"),
  ("member_primary_language", "string", "passthrough"),
  ("member_ssn", "string", "ssn_scramble"),
  ("office_address_line_1", "string", "pass_null"),
  ("office_address_line_2", "string", "pass_null"),
  ("office_name", "string", "remap_business_orgs"),
  ("office_state", "string", "passthrough"),
  ("shift", "string", "passthrough"),
  ("source_member_id", "string", "pass_null"),
  ("subscriber_address_line_1", "string", "pass_null"),
  ("subscriber_address_line_2", "string", "pass_null"),
  ("subscriber_employee_id", "string", "pass_null"),
  ("subscriber_first_name", "string", "string_scramble"),
  ("subscriber_id", "string", "pass_null"),
  ("subscriber_last_name", "string", "string_scramble"),
  ("subscriber_middle_name", "string", "pass_null"),
  ("subscriber_ssn", "string", "ssn_scramble"),
  ("subscriber_state", "string", "passthrough"),
  ("subscriber_work_email", "string", "pass_null"),
  ("subscriber_work_phone", "string", "pass_null"),
  ("unique_line_id", "string", "pass_null"),
  ("working_spouse_contribution", "string", "passthrough"),
  ("office_zip_code", "string", "remap_zip"),
  ("office_city", "string", "remap_city"),
  ("member_home_zip_code", "string", "remap_zip"),
  ("member_home_city", "string", "remap_city"),
  ("subscriber_zip_code", "string", "remap_zip"),
  ("subscriber_city", "string", "remap_city"),

]

hris_zip_city = [
  ('member_home_zip_code', 'member_home_city'),
  ('subscriber_zip_code', 'subscriber_city'),
]

## filename <-> converters

If you have a new file type, add it here with the corresponding converters.

In [0]:
dbutils.widgets.text("database_name", "")
dbutils.widgets.text("table_names", "")
dbutils.widgets.text("test_run", "")

database_name = dbutils.widgets.get("database_name")
table_names = dbutils.widgets.get("table_names").split(', ')

TEST_RUN = bool(dbutils.widgets.get("test_run"))

# Uncomment for testing in notebook
# database_name = "prod_wtw_ndc_2023_07_31_15_09_38_eb648a2d_7cd4_40ed_85a4_1c620d12030b"
# table_names = ["hris", "rx", "eligibility", "med"]
# TEST_RUN = True 

def get_filetype_translators(database_name: str, table_names: list):
    filteType_translator_list = []
    for tableName in table_names:
      if tableName == 'med':
        db_with_table = f'{database_name}.{tableName}'
        filteType_translator_list.append((tableName, db_with_table, med_translations, med_zip_city))
    
      if tableName == 'rx':
        db_with_table = f'{database_name}.{tableName}'
        filteType_translator_list.append((tableName, db_with_table, rx_translations, rx_zip_city))

      if tableName == 'eligibility':
        db_with_table = f'{database_name}.{tableName}'
        filteType_translator_list.append((tableName, db_with_table, eligibility_translations, eligibility_zip_city))

      if tableName == 'hris':
        db_with_table = f'{database_name}.{tableName}'
        filteType_translator_list.append((tableName, db_with_table, hris_translations, hris_zip_city))

    return filteType_translator_list

filetype_translators = get_filetype_translators(database_name, table_names)

# Define method to write single output file
def writeSingleFile(stagingDir: str, destinationDir: str, remove_dir: str):
  stagingDirFileInfo = str(dbutils.fs.ls(stagingDir))

  # initializing substrings -- if things break look here to confirm we are using correct substring
  begin = "dbfs:" + stagingDir + "part"
  end = "00.csv"

  # get index of substrings
  idx1 = stagingDirFileInfo.index(begin)
  idx2 = stagingDirFileInfo.index(end)
  
  result_string = ''

  # getting elements in between substring
  for idx in range(idx1 + len(begin) + 1, idx2):
      result_string = result_string + stagingDirFileInfo[idx]
  new_file_name = (begin + '-' + result_string + end).strip('dbfs:')
  fileName = ''.join(new_file_name.split('/')[-2])

  # log result
  print('file moved from folder:', new_file_name)
  print('to single file location:', destinationDir)

  dbutils.fs.mv(new_file_name, destinationDir, True)
  dbutils.fs.rm(remove_dir, True) 

# Run Logic

This is the meat and potatoes of what we're doing in here - we're looping over the file types, pulling in the translators, limiting the columns to the ones we want, and then running the conversion process.

The converters are defined here in lambdas for convenience because your date_shift has to take in the date_diff from the original source data, or you'd have to manually decide how much you want to shift your dates by.  Outside of the date shift logic needing to be pulled in each run, there's not much of the converters thatcan't be defined above - so it may be simple to define them up above and then "override" the date shift one with an updated shift amount.

Once that's done we upload them to S3

In [0]:
ROW_COUNT = 1000
rand_shift_amount = Decimal(randrange(-10, 10)) / Decimal(100.0)

In [0]:
RUN_NUMBER += 1

# Loop through the types, table names, and translations and apply all the business logic we need to each of them.  The logic in the below loop can probably be
# abstracted better than this, but it doesn't make sense to pull it all into a function just to remove one level of nesting if it doesn't remove any of the 
# complexity.  Someone smarter than me should optimize this so it's not quite so jumbled.
for file_type, table_name, translations, zip_city_conversions in filetype_translators:
  print(f"Working on table_name {table_name}")
  
  # Get yo table son
  df = spark.read.table(table_name)
  
  # Add a limit like this for testing or it will take forever to run.
  if TEST_RUN:
    print(f"Limiting data frame for testing to {ROW_COUNT} rows.")
    df = df.limit(ROW_COUNT)
  else:
    print('we will NOT limit row count in output')

  # This line is filtering the columns available to the ones that we want in the conversion - without this we have a lot of untouched
  # columns, which may or may not be desired, but this way we've only got the columns that we know of so we've got more control of the moutput dataset to make sure we don't accidentally expose things we didn't intend to - i.e. every column in the resulting data set should be there intentionally, not by accident.
  df = df[[translation[0] for translation in translations]]
  # Right now this is a bit of a hack for which column we use for the date filter - there's got to be a better way of doing this, but without
  # getting complex in the input data structure for the translators and stuff, this is a much simpler solution that works for the existing
  # file types that we know of.  As much as I'd like a good data structure for the input configuration, this is 30s of code to write to 
  # alter filtering requirements based on the file type...so...this is what you get for now
  if file_type in ('eligibility', 'hris'):
    
    if file_type in ('hris'):
      df1 = remap_hris_business_orgs(df, "employee_department")
      df2 = remap_hris_business_orgs(df1, "employee_department_l2")
      df3 = remap_hris_business_orgs(df2, "employee_department_l3")
      df4 = remap_hris_business_orgs(df3, "employee_department_l4")
      df5 = remap_hris_business_orgs(df4, "employee_department_l5")
      df6 = remap_hris_business_orgs(df5, "employee_department_l6")
      df = remap_hris_business_orgs(df6, "office_name")

    if file_type in ('eligibility'):
      df7 = remap_hris_business_orgs(df, "medical_plan_name")
      df = remap_hris_business_orgs(df7, "medical_plan_description")
    
    max_date = df.select(max(df.month_key)).collect()[0]['max(month_key)']
 
  else:
    max_date = df.select(max(df.paid_date)).collect()[0]['max(paid_date)']

  # If the max date is none we're going to default to no shift for now, but this is an area we might be able to improve the logic
  # to look at other columns or something like that so we aren't left with a non-shifted dataset.
  if max_date is None:
    max_date =pd.Timestamp.now().date()
    
  date_diff = pd.Timestamp.now().date() - max_date

  # All converters used/implemented
  converters = {
    # NOTE: Date diff must be calculated from the individual files - that converter needs to be instantiated per file
    # The rest aren't really something that needs to be instantiated on their own, but there's not much overhead to
    # instantiating them each time so it seems like it makes more sense to do it every time right now so that it's
    # easier to logic rather than having lots of complexity.
    'string_scramble': udf(lambda x: string_scramble(x), StringType()),
    'date_shift': udf(lambda x: date_shift(date_diff.days, x), StringType()),
    'ssn_scramble': udf(lambda x: ssn_scramble(x), StringType()),
    'financial_shift': udf(lambda x: financial_shift(x), DecimalType()),
    'remap_zip': udf(lambda x: remap_zip(x), StringType()),
    'remap_city': udf(lambda x: remap_city(remap_city(x)), StringType()), 
    'remap_plan': udf(lambda x: string_scramble(x), StringType()),
    'remap_employer': udf(lambda x: remap_employer(x), StringType()),
    'member_id_scramble': udf(lambda artemis_member_id: member_id_scramble(artemis_member_id), StringType()), 
    'claim_id_scramble': udf(lambda x: claim_id_scramble(x), StringType()),
    'pass_null': udf(lambda x: '', StringType())
  }
  
  # Loop over the translations for each column - if it's a pass through there's nothign to do, otherwise apply the 
  # lambda that's appropriate for that column.
  for column_name, datatype, function_name in translations:

    # Don't do anything to the base dataframe should just continue rather than implementing a function that does
    # a distributed command of "Don't do anything".  That don't make no sense.
    if function_name == "passthrough":
      print(f"No actions to take for passthrough function on {column_name}")
      continue

    converter = converters.get(function_name)

    # Raise a not implemented error so we don't ever pass over a column that has a relationship defined without
    # explicitly adding it to an exception after someone looks at it.  
    if converter is None:
      continue 
      # raise NotImplementedError(f"No converter for function {function_name} - make sure one exists.")

    print(f"Converting column name {column_name} with converter {function_name}")
    df = df.withColumn(column_name, converters[function_name](col(column_name)))
  
  # Once we've finished converting the dataframe we want to write it to S3 with some print statements so it's somewhat obvious
  # where we're at in the process.

  file_dir = f'{BASE_TARGET_DIRECTORY}{COMPANY_NAME}/{RUN_DATE}/{RUN_NUMBER}/{file_type}/{file_type}.csv'
  print(f"Uploading {file_type} to {file_dir}")
  df.coalesce(1).write.options(header=True).csv(file_dir)
  print(f"Uploaded {file_type} to {file_dir}")

  staging_dir = f'{file_dir}/'
  destination_dir = f'{BASE_TARGET_DIRECTORY}{COMPANY_NAME}/{RUN_DATE}/{RUN_NUMBER}/single_file/{file_type}.csv'
  remove_dir = f'{BASE_TARGET_DIRECTORY}{COMPANY_NAME}/{RUN_DATE}/{RUN_NUMBER}/{file_type}/'

  writeSingleFile(staging_dir, destination_dir, remove_dir)
  
print(f"Completed run {RUN_NUMBER}")

Working on table_name prod_wtw_ndc_2023_07_31_15_09_38_eb648a2d_7cd4_40ed_85a4_1c620d12030b.hris
Limiting data frame for testing to 1000 rows.
Converting column name month_key with converter date_shift
Converting column name employer_name with converter remap_employer
Converting column name employee_hire_date with converter date_shift
Converting column name employee_last_hire_date with converter date_shift
Converting column name employee_seniority_date with converter date_shift
Converting column name employee_termination_date with converter date_shift
Converting column name member_birth_date with converter date_shift
Converting column name record_import_date with converter date_shift
Converting column name subscriber_birth_date with converter date_shift
Converting column name employee_salary_annual with converter financial_shift
Converting column name employee_wage_hourly with converter financial_shift
Converting column name medical_budget_rate with converter financial_shift
No actions