In [47]:
import pandas as pd
import numpy as np
from scipy import stats
from datetime import datetime

# Display settings
pd.set_option("display.max_columns", None)

# File path (update if needed)
file_path = "Dataset\Healthcare Insurance Claims Management Dataset.xlsx"

# Load workbook
xls = pd.ExcelFile(file_path)

# Load all sheets into a dictionary
dfs = {sheet: pd.read_excel(file_path, sheet_name=sheet) for sheet in xls.sheet_names}

dfs.keys()


dict_keys(['Claims_Fact_raw', 'Provider_Master_raw', 'Member_Master_raw', 'Policy_Master_raw', 'Procedure_Master', 'Diagnosis_Master_ICD'])

In [48]:

for name, df in dfs.items():
    print(f"\n===== {name} =====")
    print("Rows:", len(df))
    print("Columns:", df.columns.tolist())



===== Claims_Fact_raw =====
Rows: 12000
Columns: ['ClaimID', 'MemberID', 'PolicyID', 'ProviderID', 'ClaimType', 'ClaimSubmissionDate', 'AdmissionDate', 'DischargeDate', 'ClaimedAmount', 'ApprovedAmount', 'ICDCode(s)', 'ProcedureCode(s)', 'PreAuthNumber', 'PreAuthStatus', 'QueryRaiseDate', 'SettlementDate', 'TPANotes', 'Currency', 'NetworkStatus']

===== Provider_Master_raw =====
Rows: 350
Columns: ['ProviderID', 'ProviderName', 'Location', 'City', 'State', 'NetworkType', 'PackageRates']

===== Member_Master_raw =====
Rows: 9000
Columns: ['MemberID', 'AgeBand', 'Gender', 'City', 'State', 'PolicyID']

===== Policy_Master_raw =====
Rows: 3000
Columns: ['PolicyID', 'PolicyType', 'CoverageLimit', 'StartDate', 'EndDate', 'PremiumAmount']

===== Procedure_Master =====
Rows: 120
Columns: ['ProcedureCode', 'ProcedureCategory', 'StandardRate', 'LOSStandard']

===== Diagnosis_Master_ICD =====
Rows: 60
Columns: ['ICDCode', 'DiagnosisCategory', 'ValidProcedureCategories']


In [49]:
claims = dfs['Claims_Fact_raw']
providers = dfs['Provider_Master_raw']
members = dfs['Member_Master_raw']
policies = dfs['Policy_Master_raw']
procedures = dfs['Procedure_Master']
diagnosis = dfs['Diagnosis_Master_ICD']

In [50]:
audit_log = []

def log_issue(table, column, issue_type, rule, count, sample=None):
    audit_log.append({
        "table": table,
        "column": column,
        "issue_type": issue_type,
        "rule": rule,
        "rows_affected": count,
        "sample_values": sample
    })


In [51]:
# ----------------------------------------------
# 5. Primary Key Duplicate Checks
# ----------------------------------------------

# Claims_Fact: ClaimID
dups = claims[claims.duplicated(subset=['ClaimID'], keep=False)]
log_issue("Claims_Fact", "ClaimID", "Duplicate Key", "ClaimID must be unique", 
          len(dups), dups['ClaimID'].head().tolist())

# Provider
dups = providers[providers.duplicated(subset=['ProviderID'], keep=False)]
log_issue("Provider_Master", "ProviderID", "Duplicate Key", "ProviderID must be unique", 
          len(dups))

# Member
dups = members[members.duplicated(subset=['MemberID'], keep=False)]
log_issue("Member_Master", "MemberID", "Duplicate Key", "MemberID must be unique", 
          len(dups))

# Policy
dups = policies[policies.duplicated(subset=['PolicyID'], keep=False)]
log_issue("Policy_Master", "PolicyID", "Duplicate Key", "PolicyID must be unique", 
          len(dups))


In [52]:
# ----------------------------------------------
# 6. Normalize Dates
# ----------------------------------------------
date_cols = [
    'ClaimSubmissionDate','AdmissionDate','DischargeDate','QueryRaiseDate','SettlementDate'
]

for col in date_cols:
    if col in claims.columns:
        claims[col] = pd.to_datetime(claims[col], errors='coerce')


# Date Logic Audit

In [53]:
invalid_stay = claims[claims['DischargeDate'] < claims['AdmissionDate']]
log_issue("Claims_Fact", "DischargeDate", "Invalid Date Order",
          "DischargeDate < AdmissionDate", len(invalid_stay),
          invalid_stay[['ClaimID','AdmissionDate','DischargeDate']].head().to_dict())


In [54]:
invalid_cycle = claims[claims['SettlementDate'] < claims['ClaimSubmissionDate']]
log_issue("Claims_Fact", "SettlementDate", "Invalid Date Order",
          "SettlementDate < ClaimSubmissionDate", len(invalid_cycle))


In [55]:
missing_discharge = claims[claims['DischargeDate'].isna()]
log_issue("Claims_Fact", "DischargeDate", "Missing", "DischargeDate is NULL",
          len(missing_discharge))

missing_settlement = claims[claims['SettlementDate'].isna()]
log_issue("Claims_Fact", "SettlementDate", "Missing", "SettlementDate is NULL",
          len(missing_settlement))


In [56]:
invalid_amounts = claims[claims['ApprovedAmount'] > claims['ClaimedAmount']]
log_issue("Claims_Fact", "ApprovedAmount", "Invalid Amount",
          "ApprovedAmount > ClaimedAmount", len(invalid_amounts))


In [57]:
neg_amount = claims[claims['ClaimedAmount'] < 0]
log_issue("Claims_Fact", "ClaimedAmount", "Negative Amount",
          "ClaimedAmount < 0", len(neg_amount))


# Outlier Detection (Z-Score)

In [58]:
#Merge procedure categories first
claims = claims.merge(
    procedures[['ProcedureCode','ProcedureCategory','StandardRate']],
    left_on='ProcedureCode(s)', right_on='ProcedureCode',
    how='left'
)


In [59]:
claims['z_score_claim'] = claims.groupby('ProcedureCategory')['ClaimedAmount']\
                                .transform(lambda x: (x - x.mean())/x.std(ddof=0))

outliers = claims[claims['z_score_claim'].abs() >= 3]

log_issue("Claims_Fact", "ClaimedAmount", "Outlier",
          "Z-score >= 3", len(outliers))


In [60]:
claims['ICD_list'] = claims['ICDCode(s)'].astype(str).str.split('|')

In [61]:
claims_exploded = claims.explode('ICD_list')

In [62]:
invalid_icd_rows = claims_exploded[
    ~claims_exploded['ICD_list'].isin(diagnosis['ICDCode'])
]


In [63]:
invalid_by_claim = invalid_icd_rows.groupby('ClaimID')['ICD_list'].apply(list)

In [64]:
log_issue(
    "Claims_Fact",
    "ICDCode(s)",
    "Invalid Code",
    "Contains ICD codes not present in Diagnosis_Master",
    len(invalid_by_claim),
    invalid_by_claim.head().to_dict()
)
invalid_by_claim.head()

Series([], Name: ICD_list, dtype: object)

In [65]:
# invalid_proc = claims[~claims['ProcedureCode(s)'].isin(procedures['ProcedureCode'])]
# log_issue("Claims_Fact", "ProcedureCode(s)", "Invalid Code",
#           "ProcedureCode not found in Procedure_Master", len(invalid_proc),invalid_proc['ProcedureCode(s)'].head().tolist())

In [71]:
policy_check = claims.merge(
    policies[['PolicyID','StartDate','EndDate']],
    on='PolicyID',
    how='left'
)

invalid_policy = policy_check[
    ((policy_check['ClaimSubmissionDate'] >= policy_check['StartDate']) &
      (policy_check['ClaimSubmissionDate'] <= policy_check['EndDate']))
]

log_issue("Claims_Fact", "PolicyID", "Invalid Coverage",
          "Admission date outside policy period", len(invalid_policy))


In [72]:
len(invalid_policy)

2565

In [68]:
audit_df = pd.DataFrame(audit_log)
audit_df.to_csv("audit_log.csv", index=False)

audit_df.head()


Unnamed: 0,table,column,issue_type,rule,rows_affected,sample_values
0,Claims_Fact,ClaimID,Duplicate Key,ClaimID must be unique,0,[]
1,Provider_Master,ProviderID,Duplicate Key,ProviderID must be unique,0,
2,Member_Master,MemberID,Duplicate Key,MemberID must be unique,0,
3,Policy_Master,PolicyID,Duplicate Key,PolicyID must be unique,0,
4,Claims_Fact,DischargeDate,Invalid Date Order,DischargeDate < AdmissionDate,0,"{'ClaimID': {}, 'AdmissionDate': {}, 'Discharg..."


In [69]:
claims.to_csv("Claims_Fact_clean.csv", index=False)
providers.to_csv("Provider_Master_clean.csv", index=False)
members.to_csv("Member_Master_clean.csv", index=False)
policies.to_csv("Policy_Master_clean.csv", index=False)
