In [1]:
### Insurance Fraud Detection & Analysis - Capstone Project
# Fully Structured Jupyter Notebook with SQL, Data Analysis, and Streamlit Dashboard

# Import necessary libraries
import pandas as pd
import sqlite3
import matplotlib.pyplot as plt
import seaborn as sns
import streamlit as st
import unittest

# -------------------------
# SECTION 1: DATA INGESTION
# -------------------------

# Connect to SQLite database
conn = sqlite3.connect("insurance_fraud.db")

# Load datasets into Pandas DataFrames
beneficiaries_df = pd.read_csv(r"C:\Users\kikas\OneDrive\projects\Health_insurance_fraud\Data\Train_Beneficiarydata-1542865627584.csv")
inpatient_df = pd.read_csv(r"C:\Users\kikas\Desktop\Python\Capstone project\Code\Data\Train_Inpatientdata-1542865627584.csv")
outpatient_df = pd.read_csv(r"C:\Users\kikas\Desktop\Python\Capstone project\Code\Data\Train_Outpatientdata-1542865627584.csv")
providers_df = pd.read_csv(r"C:\Users\kikas\Desktop\Python\Capstone project\Code\Data\Train-1542865627584.csv")

# Insert data into SQL tables
beneficiaries_df.to_sql("beneficiaries", conn, if_exists="replace", index=False)
inpatient_df.to_sql("inpatient_claims", conn, if_exists="replace", index=False)
outpatient_df.to_sql("outpatient_claims", conn, if_exists="replace", index=False)
providers_df.to_sql("providers", conn, if_exists="replace", index=False)

print("Data successfully loaded into SQLite database!")

# -------------------------
# SECTION 2: DATA CLEANING
# -------------------------

# Convert date columns to datetime format
inpatient_df["ClaimStartDt"] = pd.to_datetime(inpatient_df["ClaimStartDt"], errors='coerce')
outpatient_df["ClaimStartDt"] = pd.to_datetime(outpatient_df["ClaimStartDt"], errors='coerce')

# Fill missing values
inpatient_df.fillna("Unknown", inplace=True)
outpatient_df.fillna("Unknown", inplace=True)

# Remove duplicate claims
inpatient_df.drop_duplicates(subset=["ClaimID"], keep="first", inplace=True)
outpatient_df.drop_duplicates(subset=["ClaimID"], keep="first", inplace=True)

print("Data cleaning completed!")

# -------------------------
# SECTION 3: FRAUD DETECTION
# -------------------------

# Assign fraud risk scores
inpatient_df["fraud_risk_score"] = 0

# Rule 1: High claim amount
inpatient_df.loc[inpatient_df["claim_amount"] > (inpatient_df["claim_amount"].mean() * 2), "fraud_risk_score"] += 3

# Rule 2: Duplicate claims
duplicate_claims = inpatient_df[inpatient_df.duplicated(subset=["claim_id"], keep=False)]
inpatient_df.loc[inpatient_df["claim_id"].isin(duplicate_claims["claim_id"]), "fraud_risk_score"] += 2

# Rule 3: High-frequency providers
high_claim_providers = inpatient_df["provider_id"].value_counts()
fraudulent_providers = high_claim_providers[high_claim_providers > 100].index
inpatient_df.loc[inpatient_df["provider_id"].isin(fraudulent_providers), "fraud_risk_score"] += 2

# Display high-risk claims
high_risk_claims = inpatient_df[inpatient_df["fraud_risk_score"] >= 4]
print(high_risk_claims.head())

# -------------------------
# SECTION 4: DATA VISUALIZATION
# -------------------------

plt.figure(figsize=(10, 5))
sns.histplot(inpatient_df["fraud_risk_score"], bins=10, kde=True)
plt.title("Distribution of Fraud Risk Scores")
plt.xlabel("Fraud Risk Score")
plt.ylabel("Number of Claims")
plt.show()

# -------------------------
# SECTION 5: STREAMLIT DASHBOARD
# -------------------------

def fraud_dashboard():
    st.title("Insurance Fraud Detection Dashboard")
    risk_score_filter = st.slider("Minimum Fraud Risk Score", 0, 10, 3)
    st.subheader("High-Risk Claims")
    st.write(inpatient_df[inpatient_df["fraud_risk_score"] >= risk_score_filter])

# Uncomment to run Streamlit
# fraud_dashboard()

# -------------------------
# SECTION 6: UNIT TESTS
# -------------------------

class TestFraudDetection(unittest.TestCase):
    def test_high_value_claims(self):
        fraud_cases = inpatient_df[inpatient_df["claim_amount"] > 50000]
        self.assertTrue(len(fraud_cases) > 0)
    
    def test_duplicate_claims(self):
        fraud_cases = inpatient_df[inpatient_df.duplicated(subset=["claim_id"], keep=False)]
        self.assertTrue(len(fraud_cases) >= 0)
    
    def test_high_frequency_providers(self):
        fraud_providers = inpatient_df["provider_id"].value_counts()
        self.assertTrue((fraud_providers > 100).any())

if __name__ == "__main__":
    unittest.main()

# -------------------------
# SECTION 7: DATA DICTIONARY
# -------------------------

data_dictionary = """
## Data Dictionary for Insurance Fraud Detection

### Beneficiaries Table
- **bene_id**: Unique identifier for the patient
- **dob**: Date of birth
- **gender**: Gender (M/F)
- **race**: Racial classification
- **chronic_cond_diabetes**: Indicates if patient has diabetes (True/False)

### Inpatient Claims Table
- **claim_id**: Unique claim identifier
- **provider_id**: ID of the provider (hospital/doctor)
- **bene_id**: ID of the patient
- **claim_start_date**: Date of claim initiation
- **claim_amount**: Amount claimed for the procedure
- **fraud_risk_score**: Assigned risk score (0-10)

### Outpatient Claims Table
- **claim_id**: Unique claim identifier
- **claim_amount**: Amount claimed for outpatient services

### Providers Table
- **provider_id**: Unique ID for providers
- **potential_fraud**: Whether provider is suspected of fraud (True/False)
"""

with open("data_dictionary.md", "w") as f:
    f.write(data_dictionary)

print("Data Dictionary Created!")


Data successfully loaded into SQLite database!


  inpatient_df.fillna("Unknown", inplace=True)
  outpatient_df.fillna("Unknown", inplace=True)


Data cleaning completed!


KeyError: 'claim_amount'