# Hospital Patient Claims
## Staging to Curated 

####  Run this cell to set up and start your interactive session.


In [None]:
%idle_timeout 10
%glue_version 5.0
%worker_type G.1X
%number_of_workers 2

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import * 
from pyspark.sql.types import * 
from awsglue import DynamicFrame
import json 
from datetime import date
import boto3
from botocore.exceptions import ClientError
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
args = getResolvedOptions(sys.argv, ['is_init_load'])

In [None]:
sc = SparkContext()
glueContext=GlueContext(sc)
spark=glueContext.spark_session

In [None]:
def get_secret():
    secret_name = "dev/motor_theft_vehicles/redshift_connection"
    region_name = "us-east-1"
    client = boto3.client(
        service_name='secretsmanager',
        region_name=region_name
    )
    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        raise e
    db_config = get_secret_value_response['SecretString']
    return db_config

db_config = json.loads(get_secret())

In [None]:
my_conn_options = {
    "url": db_config['dev_url'],
    "user": db_config['dev_username'],
    "password": db_config['dev_password'],
    "redshiftTmpDir": db_config['dev_redshift_temp_directory'],
}

In [None]:
initial_load = args['is_init_load']

In [2]:
from awsglue.context import GlueContext
from pyspark.context import SparkContext
import awsglue.transforms as  T
import  pyspark.sql.functions as  F
from pyspark.sql.types import * 
from pyspark.sql import SparkSession
from awsglue.utils import *
from awsglue.dynamicframe import DynamicFrame
import sys  
from datetime import datetime 
import pandas as pd
import boto3

sc = SparkContext()
glueContext=GlueContext(sc)
spark=glueContext.spark_session

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
todays_date = datetime.today().strftime('%Y-%m-%d')

In [4]:
df = spark.read.csv('data/staging_csv/staging_data_patients_claims.csv', header=True)

In [None]:
'claim_id','claim_status','claim_request_amount','claim_initialized_date','claim_rejected_reason','source_file_name','source_file_path','load_timestamp')


In [None]:
# incremental load table 
dyf_incremental_load = glueContext.create_data_frame.from_catalog(
    connection_type = 'redshift',
    connection_options = my_conn_options,
    database_name = 'db_patient_claims_staging',
    table_name = 'staging_claims_incremental_load'
)

# claim table 
dyf_staging_claims = dyf_incremental_load.select('claim_id','claim_status','claim_request_amount','claim_initialized_date','claim_rejected_reason','source_file_name','source_file_path','load_timestamp')

# patient table 
dyf_staging_patient = dyf_incremental_load.select('patient_id','name_prefix','first_name','last_name','date_of_birth','phone_number','email_id')

# policy table 
dyf_staging_policy = dyf_incremental_load.select('policy_id','policy_start_date','policy_end_date','preimum_amount','coverage_limit')

# address table 
dyf_staging_address = dyf_incremental_load.select('address_id','address_line_1','address_line_2','city','state','zip_code')

In [None]:
claims_merge_query_post_action ="""

BEGIN;

UPDATE dim_claims 
SET is_current = 'N', effective_end_date = getdate()
FROM staging_claims
WHERE staging_claims.claim_id = dim_claims.claim_id
and dim_claims.is_current = 'Y'
;

INSERT INTO dim_claims(claim_id,claim_status,claim_request_amount,claim_initialized_date,claim_rejected_reason,source_file_name,source_file_path,load_timestamp)
SELECT claim_id,claim_status,claim_request_amount,claim_initialized_date,claim_rejected_reason,source_file_name,source_file_path,load_timestamp
FROM staging_claims;

COMMIT;
"""
# loading Staging Claims
my_conn_options['dbtable'] = "staging_claims"
my_conn_options['preactions'] = "TRUNCATE "+my_conn_options['dbtable']
my_conn_options['postactions'] = claims_merge_query_post_action 
glueContext.write_dynamic_frame.from_options(
    connection_type="redshift",
    connection_options = my_conn_options,
    frame = dyf_staging_claims,
)

In [None]:
patient_merge_query_post_action ="""

BEGIN;

UPDATE dim_patients 
SET is_current = 'N', effective_end_date = getdate()
FROM staging_patients
WHERE staging_patients.patient_id = dim_patients.patient_id
and dim_patients.is_current = 'Y'
;

INSERT INTO dim_patients(patient_id,name_prefix,first_name,last_name,date_of_birth,phone_number,email_id)
SELECT patient_id,name_prefix,first_name,last_name,date_of_birth,phone_number,email_id
FROM staging_patients;

COMMIT;
"""
# loading Stagting Patients
my_conn_options['dbtable'] = "staging_patients"
my_conn_options['preactions'] = "TRUNCATE "+my_conn_options['dbtable']
my_conn_options['postactions'] = claims_merge_query_post_action 
glueContext.write_dynamic_frame.from_options(
    connection_type="redshift",
    connection_options = my_conn_options,
    frame = dyf_staging_claims,
)

In [None]:
policy_merge_query_post_action ="""

BEGIN;

UPDATE dim_policy 
SET is_current = 'N', effective_end_date = getdate()
FROM staging_policy
WHERE staging_policy.policy_id = dim_policy.policy_id
and dim_policy.is_current = 'Y'
;

INSERT INTO dim_policy(policy_id,policy_start_date,policy_end_date,preimum_amount,coverage_limit)
SELECT policy_id,policy_start_date,policy_end_date,preimum_amount,coverage_limit
FROM staging_policy;

COMMIT;
"""
# loading Staging Policy
my_conn_options['dbtable'] = "staging_policy"
my_conn_options['preactions'] = "TRUNCATE "+my_conn_options['dbtable']
my_conn_options['postactions'] = policy_merge_query_post_action 
glueContext.write_dynamic_frame.from_options(
    connection_type="redshift",
    connection_options = my_conn_options,
    frame = dyf_staging_claims,
)

In [None]:
address_merge_query_post_action ="""

BEGIN;

UPDATE dim_address 
SET is_current = 'N', effective_end_date = getdate()
FROM staging_address
WHERE staging_address.address_id = dim_policy.address_id
and dim_address.is_current = 'Y'
;

INSERT INTO dim_address(address_id,address_line_1,address_line_2,city,state,zip_code)
SELECT address_id,address_line_1,address_line_2,city,state,zip_code
FROM staging_address;

COMMIT;
"""
# loading Staging Address
my_conn_options['dbtable'] = "staging_address"
my_conn_options['preactions'] = "TRUNCATE "+my_conn_options['dbtable']
my_conn_options['postactions'] = address_merge_query_post_action 
glueContext.write_dynamic_frame.from_options(
    connection_type="redshift",
    connection_options = my_conn_options,
    frame = dyf_staging_claims,
)

In [None]:
if initial_load:
    dim_date = spark.sql('''
    SELECT explode(sequence(to_date('2000-01-01'), to_date('2030-01-01'), interval 1 months)) as date
    ''')
    dim_date_cols = {
    "date_id":date_format(dim_date.date,'MMdd'),
    "month":date_format(dim_date.date, 'M'),
    "month_short":date_format(dim_date.date, "LLL"), 
    "month_long":date_format(dim_date.date, "LLLL"),
    "year_short":date_format(dim_date.date, 'yy'),
    "year_long":date_format(dim_date.date, 'yyyy'),
    "quarter":ceil(date_format(dim_date.date, 'M')/3),
    }
    dim_date = dim_date.withColumns(dim_date_cols)
    dim_date = dim_date.drop('date')
    dfy_dim_date = glueContext.fromDF(dim_date)
    # loading Curated Dim Date
    my_conn_options['dbtable'] = "dim_date"
    glueContext.write_dynamic_frame.from_options(
        connection_type="redshift",
        connection_options = my_conn_options,
        frame = dyf_make_model
    )

In [None]:
fact_table_insert_query_post_action ="""

INSERT INTO staging_thefts_load(fact_claim_id,patient_id,address_id,policy_ref_id,claim_id,load_datetime)
(
SELECT 
dim_patient.patient_ref_id,
dim_claims.claims_ref_id,
dim_address.address_ref_id,
dim_policy.policy_ref_id,
getdate() load_datetime
FROM staging_claims_incremental_load
join dim_patient on staging_claims_incremental_load.patient_id = dim_patient.patient_id and is_current = 'Y' and effective_end_date is NULL 
join dim_claims on staging_claims_incremental_load.claim_id = dim_claims.claim_id and is_current = 'Y' and effective_end_date is NULL 
join dim_address on staging_claims_incremental_load.address_id = dim_address.address_id and is_current = 'Y' and effective_end_date is NULL 
join dim_policy on staging_claims_incremental_load.policy_id = dim_policy.policy_id and is_current = 'Y' and effective_end_date is NULL 
)

"""
# loading Staging Claims INcremental Load 
dyf_staging_incidents = DynamicFrame.fromDF(staging_incidents, glueContext, 'sm')
my_conn_options['dbtable'] = "staging_claims_incremental_load"
my_conn_options['postactions'] = fact_table_insert_query_post_action
glueContext.write_dynamic_frame.from_options(
    connection_type="redshift",
    connection_options = my_conn_options,
    frame = dyf_staging_incidents,
)