In [0]:
# Set ups
# Spark built in libraries 
import pyspark.sql.functions as f 
import pyspark.sql.types as t

# Native Python Libraries 
import pandas as pd
import os
import datetime as dt
import numpy as np
import json

In [0]:
def Banfield_Data_Dictionary():
  """
  This function reads in the a .json file using the Banfield Data Dictionary from Confluence:
  
  Full Notebook can be found at: 
  /Users/tim.gottgetreu@effem.com/Functions/Convert XLS Data Dictionaries to JSON_FUNCTION
  
  """
  read_in_dict = open("/dbfs/FileStore/tables/Banfield_Data_Dictionary.json")
  Banfield_Data_Dictionary = json.load(read_in_dict)
  
  print( """The Banfield Data Dictionary is Loaded.
            Structure of dictionary:
            
            {"TABLE_NAME" = {'PRIMARY_KEY':'Primary Key as identified in the schema.txt file.'
                             'UNIQUE_KEYS':['List of Unique Keys indentified as 'UNIQUE in the meta data notes.],
                             'TABLE_COMMENTS': ["Any comments found in the TABLE dictionary sheet containing 
                                                 information for a table as a whole"],
                             'COLUMNS':{'COLUMN_NAME':{
                                         'DATA_TYPE': Date Type,
                                         'DATA_LENGTH': Lenght of Data,
                                         'DATA_PRECISION': Precision of data,
                                         'NULLABLE': 'Y or N',
                                         'Column Number': Column location,
                                         'Extract Notes': Notes on extraction,
                                         'Metadata Notes': 'Notes about the column var'
                             }}}


  
        """)
  
  return Banfield_Data_Dictionary
  
  
  

In [0]:
def dedupData(df, primary_keys, dedupe_columns):
  """
  This function finds the most recent data indicated by a given ingestion date/timestamp/job id column for a given list of ID columns.
  
  Args:
    df              (spark dataframe): A spark dataframe
    primary_keys        (list of str): Primary key columns by which to partition
    dedupe_columns      (list of str): Columns indicating the ingestion date/timestamp/job id for the data
    
  Returns:
    deduped_df      (spark dataframe): The df deduplicated
  """
  # Create window
  window_dedup = (
    Window
    .partitionBy(*primary_keys)
    .orderBy(*(f.col(dedupe_columns).desc() for dedupe_columns in dedupe_columns)))
  
  deduped_df = (
    df
    .withColumn("DATA_PROCESS_RANK", f.row_number().over(window_dedup))
    .filter("DATA_PROCESS_RANK = 1")
    .drop("DATA_PROCESS_RANK")
    .distinct())
  
  return deduped_df

In [0]:
def invoice_spend(df_invoice,date_range):
  """
  This Function takes the TB_CMF_INVC_LINE_ITEM table and returns a spark dataframe for total spending over a given     timeframe.
  
  Source: /Users/saima.rahman@effem.com/Projects/04. Pet Insurance/2019/00. Banfield Pets 2019 (18-19)
  
  
    Args:
    df              (spark dataframe): The TB_CMF_INVC_LINE_ITEM spark dataframe
    date_range      (List): A list of Two dates
    
  Returns:
    invoice_spend_df      (spark dataframe): The total_spend_invoice for a given timeframe.
  """
  invoice_col_select = ['DW_CLIENT_ID','DW_PET_ID','DW_ITEM_ID','DW_INVC_LINE_ITEM_ID', 'DW_HOSP_ID',
                        'PET_VST_BEGIN_DT', 'DW_PET_VST_ID', 'DW_PET_VST_TYP_ID','DW_PET_VST_REASON_ID',
                        'DW_PET_VST_STATUS_ID',  'DW_REVENUE_TYPE_ID', 'line_item_revenue_amt']
  
  start_date = date_range[0]
  end_date = date_range[1]
  
  invoice_spend = (df_invoice
           .filter (f.col('PET_VST_BEGIN_DT').between(str(start_date), str(end_date)) # selected time period
               & (f.col('dw_pet_vst_invc_status_id').isin([6,10])) 
               & (f.col('dw_invc_line_status_id').isin([17,30])) 
               & (f.col('DW_REVENUE_TYPE_ID').isin([3,4]))   )
           .select(*invoice_col_select)
           .distinct()) 
  
  return invoice_spend

In [0]:
de_dup_key_dict = {'TB_CMD_CLIENT':{
                  'Primary_Keys':['DW_CLIENT_ID'],
                  'de_dupe_columns':['DW_JOB_ID','EXTRACTION_TIMESTAMP']
                  },
             'TB_CMD_HOSP':{
                  'Primary_Keys':['DW_HOSP_ID'],#DW_HOSP_ID or HOSP_ABBRV
                  'de_dupe_columns':['DW_JOB_ID','EXTRACTION_TIMESTAMP']
             },
             'TB_CMD_PET':{
                  'Primary_Keys':['DW_PET_ID'],
                  'de_dupe_columns':['DW_JOB_ID','EXTRACTION_TIMESTAMP']
             },
             'TB_CMF_APPT_DETAIL':{
                  'Primary_Keys':['DW_APPT_DETAIL_ID'],
                  'de_dupe_columns':['EXTRACTION_TIMESTAMP']
             },
            'TB_CMD_PET_VST_TYP':{
                  'Primary_Keys':['DW_PET_VST_TYP_ID'],
                  'de_dupe_columns':['DW_JOB_ID']
             },
             'TB_CMD_PET_VST_STATUS':{
                  'Primary_Keys':['DW_PET_VST_STATUS_ID'],
                  'de_dupe_columns':['DW_JOB_ID']
             },
             'TB_CMD_APPT_STATUS':{
                  'Primary_Keys':['DW_APPT_STATUS_ID'],
                  'de_dupe_columns':['DW_JOB_ID']
             },
             'TB_CMF_PET_VST_EVENT':{
                  'Primary_Keys':['DW_PET_VST_ID'],
                  'de_dupe_columns':['DW_JOB_ID']
             },
              'TB_CMF_TM_HRS_SCHED':{
                  'Primary_Keys':['SRC_SCHEDHASHID'],
                  'de_dupe_columns':['DW_JOB_ID', "EXTRACTION_TIMESTAMP"]
              },
#                'TB_CMS_TM_HRS_SCHED':{
#                   'Primary_Keys':['SRC_SCHEDHASHID'],
#                   'de_dupe_columns':['DW_JOB_ID', "EXTRACTION_TIMESTAMP"]
#               },
               'TB_CMF_PW_TM_DAY_HOSP_HOURS':{
                  'Primary_Keys':['DW_PW_TM_DAY_HOSP_HOURS_ID'],
                  'de_dupe_columns':['DW_JOB_ID']
              },
                'TB_CMF_DOC_INPT_PET_VST':{
                  'Primary_Keys':['DW_DOC_INPT_PET_VST_ID'],
                  'de_dupe_columns':['DW_JOB_ID']
                },
                 'TB_CMD_CLNDR':{
                  'Primary_Keys':['DW_CLNDR_ID'],
                  'de_dupe_columns':['DW_JOB_ID',"EXTRACTION_TIMESTAMP"]
                },
                'TB_CMF_INVC_LINE_ITEM':{
                  'Primary_Keys':['DW_INVC_LINE_ITEM_ID'],
                  'de_dupe_columns':['DW_JOB_ID',"EXTRACTION_TIMESTAMP"]
                },
                 'TB_CMF_DX_EVENT':{
                  'Primary_Keys':['DW_DX_ID'],
                  'de_dupe_columns':['DW_JOB_ID',"EXTRACTION_TIMESTAMP"]
                },
                 'TB_CMF_HOSP_BEST':{
                  'Primary_Keys':['DW_HOSP_ID'],
#                   'Primary_Keys':['HOSP_ABBRV'],
                  'de_dupe_columns':['DW_JOB_ID',"EXTRACTION_TIMESTAMP"]
                },
                 'TB_CMF_OSAT_SRVY_SESSION':{
                  'Primary_Keys':['SRC_OSAT_CD'],
                  'de_dupe_columns':['DW_JOB_ID',"EXTRACTION_TIMESTAMP"]
                },
                  'TB_CMF_WP_SUMMARY':{
                  'Primary_Keys':['DW_WP_ID'],
                  'de_dupe_columns':['DW_JOB_ID',"EXTRACTION_TIMESTAMP"]
                },
            }

# SRC_SCHEDHASHID  SRC_CLOCK_CD DW_OSAT_RESPONSE_ID




In [0]:
def hosp_abb_dedup ():
  raw_data_folder = "/mnt/pdp-read/KYTE/BANFIELD/"
  hosp_df = spark.read.format('delta').load(os.path.join(raw_data_folder, "TB_CMD_HOSP/"))
  hosp_abb_dedup = dedupData(hosp_df,['HOSP_ABBRV'],['DW_JOB_ID','EXTRACTION_TIMESTAMP'])
  return hosp_abb_dedup

<strong>Tim's Banfield funcitons are loaded:</strong>
- dedupData()
- Banfield_Data_Dictionary()
- de_dup_key_dict{}
- hosp_abb_dedup()