In [None]:
#Importing all the required modules

from sqlalchemy.engine import create_engine
import pandas as pd
import datetime
from sqlalchemy.dialects.oracle import DATE, VARCHAR2, FLOAT, NUMBER,TIMESTAMP
from sqlalchemy import text,String,types
from datetime import datetime
from db_connections import fastpaceanalysis,datawarehouse_prod,AceExtraction

In [None]:
#Database connections

AceExtraction = AceExtraction()
fastpaceanalysis=fastpaceanalysis()
datawarehouse=datawarehouse_prod()

In [None]:
# Extracting data from fastpaceanalysis
## REFER ClearingHouse_RejectionDetails AS CHRD ##


with fastpaceanalysis.begin():
    try:
        print('Extracting_data_from source')

        query_for_CHRD = text('''SELECT  Patient_FirstName,Patient_LastName,Provider_LastName,Insurance_Claim_ID,Invoice_Number,Claim_Amount,Reason,Reject_By,Reject_Message,Rejected_Code,
                                    Status,Insurance_Name,Reject_Date,Service_Date
                                    FROM FastPaceAnalysis.ClearingHouse_RejectionDetails where Service_Date >= '2024-04-01';''')
        
        CHRD = pd.read_sql(query_for_CHRD,fastpaceanalysis)
        
        print('Data succesfully read from source. Records count is',CHRD.count())
        
    except Exception as e:
        print('Error: ',e)

In [None]:
# Extracting data from datawarehouse

with datawarehouse.begin():
    
    try:
        print('Reading Data from Dimension tables')

        #Query to fetch ins_firm_id,ins_firm_name
        query_for_insurance_firms = text(''' SELECT ins_firm_id,ins_firm_name FROM datawarehouse.insurance_firms_dim; ''')
        ins_firm = pd.read_sql(query_for_insurance_firms,datawarehouse)

        #Query to fetch cal_date_id,cal_date
        query_for_calendar = text(''' SELECT cal_date_id,cal_date FROM datawarehouse.calendar_dim; ''')
        calendar = pd.read_sql(query_for_calendar,datawarehouse)

        #Query to fetch service_provider_id,service_provider_last_name
        query_for_service_provider = text(''' SELECT service_provider_id,service_provider_first_name,service_provider_last_name FROM datawarehouse.service_provider_dim; ''')
        service_provider = pd.read_sql(query_for_service_provider,datawarehouse)

        #Query to fetch pat_dim_id
        query_for_patient = text('''SELECT pat_dim_id,pat_source_key FROM datawarehouse.patient_dim; ''')
        patient = pd.read_sql(query_for_patient,datawarehouse)

        #Query to fetch clinic_dim_id
        query_for_clinic = text('''SELECT clinic_dim_id,clinic_name FROM datawarehouse.clinic_dim;  ''')
        clinic = pd.read_sql(query_for_clinic,datawarehouse)


        #Query to fetch contract_type_dim_id
        query_for_contract_type = text('''SELECT contract_type_id,contract_type_desc   FROM datawarehouse.contract_type_dim;''')
        contract_type = pd.read_sql(query_for_contract_type,datawarehouse)
        
        print('Completed Reading Data from Dimension tables')

    except Exception as e:
        print('Error: ',e)

In [None]:
# Extracting data from AceExtraction

with AceExtraction.begin():
    try:
        print('Reading Data From source, Medrit_input_table')

        query_for_MEDRIT_INPUT_TABLE = text('''SELECT Encounter_Date,Patient_Account_Number,Patient_Name,
                                               Facility_Name,Appointment_Provider_Name,GC_NGC,
                                                Patient_Primary_Insurance_Name
                                                FROM AceExtraction.Medrite_input_table
                                                where client like 'FP%' and Encounter_Date >= '2024-04-01';''')
        
        MEDRITE_INPUT_TABLE = pd.read_sql(query_for_MEDRIT_INPUT_TABLE,AceExtraction)

        print('Completed reading Data From source, Medrit_input_table')
        
    except Exception as e:
        print('Error: ',e)

In [None]:
# Changing Datatypes

CHRD['Reject_Date'] = CHRD['Reject_Date'].astype(str)
CHRD['Service_Date'] = CHRD['Service_Date'].astype(str)
calendar['cal_date'] = calendar['cal_date'].astype(str)
MEDRITE_INPUT_TABLE['Patient_Account_Number'] = MEDRITE_INPUT_TABLE['Patient_Account_Number'].astype(str)
MEDRITE_INPUT_TABLE['Encounter_Date'] =MEDRITE_INPUT_TABLE['Encounter_Date'].astype(str)
patient['pat_source_key'] = patient['pat_source_key'].astype(str)

In [None]:
#Creating empty dataframe as per our target(Fact table)

clearing_house_rejection_fact = pd.DataFrame(columns=[
    'claim_rejection_txn_id',
    'insurance_claim_id',
    'invoice_number',
    'claim_amount',
    'insurance_firms_dim_id',
    'service_provider_dim_id',
    'reject_date_id',
    'patient_dim_id',
    'service_date_id',
    'clinic_dim_id',
    'contract_type_dim_id',
    'rejection_reason',
    'rejected_by',
    'reject_message',
    'reject_code',
    'reject_status',
    'created_date'
])

In [None]:
print('Transformation Process begins')

pat_names = MEDRITE_INPUT_TABLE['Patient_Name'].str.split(",",n=1,expand=True)
first_pat_name = pat_names[1].str.strip()
last_pat_name = pat_names[0].str.split(" ",n=1,expand=True)[0]

MEDRITE_INPUT_TABLE['input_patient_first_name']= first_pat_name
MEDRITE_INPUT_TABLE['input_patient_last_name']= last_pat_name

MEDRITE_INPUT_TABLE = MEDRITE_INPUT_TABLE.drop_duplicates(subset=['input_patient_first_name', 'input_patient_last_name','Encounter_Date'])


In [None]:
df = pd.merge(CHRD,MEDRITE_INPUT_TABLE,
              how = 'left',
              left_on=['Patient_FirstName','Patient_LastName','Service_Date'],
              right_on = ['input_patient_first_name','input_patient_last_name','Encounter_Date'])

In [None]:
names = df['Appointment_Provider_Name'].str.split(",",n=1,expand=True)
df['service_provider_first_name'] = names[1]
df['service_provider_last_name'] = names[0]

#Trimming the columns in order to avoid join discripencies

df['service_provider_first_name'] = df['service_provider_first_name'].str.strip()
df['service_provider_last_name'] = df['service_provider_last_name'].str.strip()

In [None]:
#Joining df and ins_firm to get the ins_firm_id

df1 = pd.merge(df, ins_firm, 
              how='left', 
              left_on='Insurance_Name', 
              right_on='ins_firm_name')

In [None]:
#Joining df with cal_dim to get the cal_dim_id 
#Replacing cal_dim with reject_date_id, because we have again create join with cal_dim for service_date_id

df2 = pd.merge(df1, calendar, 
              how='left', 
              left_on='Reject_Date', 
              right_on='cal_date')

df2.rename(columns={'cal_date_id': 'reject_date_id'},inplace=True)


In [None]:
#joining df1 with cal_date to generate service_date_id

df3 = pd.merge(df2, calendar, 
              how='left', 
              left_on='Service_Date', 
              right_on='cal_date')

df3.rename(columns={'cal_date_id': 'service_date_id'},inplace=True)

In [None]:
#Joing with Service provider dimension

df5 = pd.merge(df3,service_provider,
               how= 'left',
               left_on = ['service_provider_first_name','service_provider_last_name'],
               right_on = ['service_provider_first_name','service_provider_last_name'])

In [None]:
#Joining with Patient Dimension

df6 = pd.merge(df5,patient,
               how = 'left',
               left_on = 'Patient_Account_Number',
               right_on = 'pat_source_key')


               

In [None]:
# Joining df with clinic_dim

df7 = pd.merge(df6,clinic,
               how='left',
               left_on='Facility_Name',
               right_on='clinic_name')

In [None]:
# Joining df6 with  contract_type_dim

df8 = pd.merge(df7,contract_type,
               how='left',
               left_on='GC_NGC',
               right_on='contract_type_desc')

In [None]:
df9 = df8[['Insurance_Claim_ID','Invoice_Number','Claim_Amount','ins_firm_id','service_provider_id','reject_date_id',
            'pat_dim_id','service_date_id','clinic_dim_id','contract_type_id','Reason','Reject_By',
           'Reject_Message','Rejected_Code','Status']]

In [None]:
new_ids = pd.Series(range(1, len(df9) + 1))

In [None]:
clearing_house_rejection_fact['claim_rejection_txn_id'] = [f'cl_hs_rj_{str(id).zfill(3)}' for id in new_ids]
clearing_house_rejection_fact['insurance_claim_id']=list(df9['Insurance_Claim_ID'])
clearing_house_rejection_fact['invoice_numberInsurance_Claim_ID']=list(df9['Invoice_Number'])
clearing_house_rejection_fact['claim_amount']=list(df9['Claim_Amount'])
clearing_house_rejection_fact['insurance_firms_dim_id']=list(df9['ins_firm_id'])
clearing_house_rejection_fact['service_provider_dim_id']=list(df9['service_provider_id'])
clearing_house_rejection_fact['reject_date_id']=list(df9['reject_date_id'])
clearing_house_rejection_fact['patient_dim_id']=list(df9['pat_dim_id'])
clearing_house_rejection_fact['service_date_id']=list(df9['service_date_id'])
clearing_house_rejection_fact['clinic_dim_id']=list(df9['clinic_dim_id'])
clearing_house_rejection_fact['contract_type_dim_id']=list(df9['contract_type_id'])
clearing_house_rejection_fact['rejection_reason']=list(df9['Reason'])
clearing_house_rejection_fact['rejected_by']=list(df9['Reject_By'])
clearing_house_rejection_fact['reject_message']=list(df9['Reject_Message'])
clearing_house_rejection_fact['reject_code']=list(df9['Rejected_Code'])
clearing_house_rejection_fact['reject_status']=list(df9['Status'])
clearing_house_rejection_fact['created_date']=datetime.now().strftime('%Y-%m-%d')

In [None]:
#Defining data types

data_types = {
    'claim_rejection_txn_id': types.String(50),
    'insurance_claim_id': types.String(50), 
    'invoice_number': types.String(50),      
    'claim_amount': types.Float(),
    'insurance_firms_dim_id': types.String(50),
    'reject_date_id': types.String(50),     
    'service_date_id': types.String(50),    
    'rejection_reason': types.String(255),
    'rejected_by': types.String(50),
    'reject_message': types.String(255),
    'reject_code': types.String(50),
    'reject_status': types.String(50),
    'created_date': types.Date()
}

Print('Transformation Process completed')

In [None]:
#loading into datawarehouse
try:
    clearing_house_rejection_fact.to_sql('clearing_house_rejection_fact_1',datawarehouse, if_exists='replace', index=False,dtype=data_types)
    print('successfully loaded into Data_Warehouse_Dev_Env')
except Exception as e:
        print("An error occurred while loading to Datawarehouse:", e)