# ETL Pipeline for Insurance Fraud Detection

This notebook implements a structured **Extract, Transform, Load (ETL)** pipeline on raw insurance claims data I found on kaggle, link here: [Auto Insurance Claims Fraud dataset](https://www.kaggle.com/datasets/antopravinjohnbosco/auto-insurance-claims-fraud-detection?resource=download). The goal is to clean and prepare the dataset for accurate fraud detection analysis and subsequently load the processed data into a **PostgreSQL** database for efficient querying.

### Pipeline Overview:
- Load raw insurance claims data from CSV
- Conduct thorough data integrity and quality checks
- Handle missing and inconsistent data with appropriate transformations
- Engineer key features to support fraud detection modeling
- Save the cleaned dataset as a reusable **CSV**
- Load cleaned data into a PostgreSQL table (`insurance_claims`)
- Run SQL queries to validate data loading and summarize fraud occurrences

In [1]:
# Import & load essential libraries
import pandas as pd                       # For data manipulation and cleaning
import os                                 # For managing file paths
from sqlalchemy import create_engine      # For database connection to PostgreSQL
from dotenv import load_dotenv            # For loading environment variables from .env file
load_dotenv()                             # Load environment variables from .env file

True

## 1. Define File Paths & PostgreSQL Connection

Set up input/output file locations and connect to my **PostgreSQL** database using **SQLAlchemy**.


### Define File Paths

In [2]:
# Define base project directory
project_dir = r"C:\Users\Cloud\OneDrive\Desktop\Fraud_Analytics_Project"

# Define path to the raw insurance claims CSV dataset
dataset_path = os.path.join(project_dir, "dataset", "insurance_fraud_claims.csv")

# Define directory to save cleaned data outputs
cleaned_dir = os.path.join(project_dir, "data", "cleaned")
os.makedirs(cleaned_dir, exist_ok=True)

# Define path to save cleaned dataset
cleaned_file = os.path.join(cleaned_dir, "cleaned_insurance_claims.csv")

### PostgreSQL Connection

In [3]:
# Load PostgreSQL credentials, mine is stored in a .env file
pg_user = os.getenv("PG_USER")              # Your PostgreSQL username
pg_password = os.getenv("PG_PASSWORD")      # Your password
pg_host = os.getenv("PG_HOST")              # Host where PostgreSQL is running (usually localhost)
pg_port = os.getenv("PG_PORT")              # Default PostgreSQL port
pg_dbname = os.getenv("PG_DBNAME")          # The database you've created in pgAdmin

# Safety check to ensure no credentials are missing
required_env_vars = [pg_user, pg_password, pg_host, pg_port, pg_dbname]
if any(v is None for v in required_env_vars):
    raise ValueError("One or more environment variables are missing. Check your .env file.")

# Establish connection engine to PostgreSQL database for data loading and querying
engine = create_engine(
    f"postgresql+psycopg2://{pg_user}:{pg_password}@{pg_host}:{pg_port}/{pg_dbname}"
)

## 2. Load Raw Insurance Claims Dataset

Read the original dataset into a `pandas` DataFrame and preview its structure.

In [4]:
# Load the raw insurance claims data into a pandas DataFrame
df = pd.read_csv(dataset_path)

# Preview and display first few rows of DataFrame structure
print("Raw data loaded. Shape:", df.shape)
df.head()

Raw data loaded. Shape: (1000, 40)


Unnamed: 0,months_as_customer,age,policy_number,policy_bind_date,policy_state,policy_csl,policy_deductable,policy_annual_premium,umbrella_limit,insured_zip,...,police_report_available,total_claim_amount,injury_claim,property_claim,vehicle_claim,auto_make,auto_model,auto_year,fraud_reported,_c39
0,328,48,521585,2014-10-17,OH,250/500,1000,1406.91,0,466132,...,YES,71610,6510,13020,52080,Saab,92x,2004,Y,
1,228,42,342868,2006-06-27,IN,250/500,2000,1197.22,5000000,468176,...,?,5070,780,780,3510,Mercedes,E400,2007,Y,
2,134,29,687698,2000-09-06,OH,100/300,2000,1413.14,5000000,430632,...,NO,34650,7700,3850,23100,Dodge,RAM,2007,N,
3,256,41,227811,1990-05-25,IL,250/500,2000,1415.74,6000000,608117,...,NO,63400,6340,6340,50720,Chevrolet,Tahoe,2014,Y,
4,228,44,367455,2014-06-06,IL,500/1000,1000,1583.91,6000000,610706,...,NO,6500,1300,650,4550,Accura,RSX,2009,N,


**Quick exploratory check on distribution for missing or unusual values**

In [5]:
# Check distribution of 'collision_type', including missing values
df['collision_type'].value_counts(dropna=False)

# Check distribution of 'incident_type', including missing values
df['incident_type'].value_counts(dropna=False)

# Check distribution of 'collision_type' excluding missing values (default behavior)
df['collision_type'].value_counts()

# Check distribution of 'incident_type' excluding missing values
df['incident_type'].value_counts()

incident_type
Multi-vehicle Collision     419
Single Vehicle Collision    403
Vehicle Theft                94
Parked Car                   84
Name: count, dtype: int64

## 3. Replace Placeholder Values with Standard Missing Marker

Many datasets use placeholders like `"?"`, `"unknown"`, `"n/a"`, or `"missing"` to indicate missing or unclear data.  
To ensure consistent and accurate analysis, I replaced these placeholders with `pandas` standard missing value marker (`pd.NA`).  

This helps downstream processes correctly recognize and handle missing data.

In [6]:
# List of placeholder values commonly used for missing data
placeholders = ['?', 'unknown', 'n/a', 'missing']

# Replace all such placeholders with pandas standard missing value marker pd.NA
df.replace(placeholders, pd.NA, inplace=True)
print("Placeholder values like '?' replaced with pd.NA.\n")

# Normalize 'fraud_reported' for consistent validation and mapping
if 'fraud_reported' in df.columns:
    df['fraud_reported'] = df['fraud_reported'].astype(str).str.strip().str.upper()
    print("Normalized 'fraud_reported' values to uppercase and stripped whitespace.\n")

Placeholder values like '?' replaced with pd.NA.

Normalized 'fraud_reported' values to uppercase and stripped whitespace.



## 4. Normalize All String Columns For Uniformity

Here I need to standardize string based fields to ensure consistency across the dataset. 
String values (like `'yes'`, `'Yes'`, `' YES '`) may represent the same category but appear different to a computer. This step ensures all string columns are uniformly formatted for reliable processing.

In [7]:
# Normalize all string and object columns to ensure consistent formatting
for col in df.select_dtypes(include='object').columns:
    df[col] = df[col].astype(str).str.strip().str.upper()
    
print("Normalized all string columns: stripped whitespace and converted text to uppercase.\n")

Normalized all string columns: stripped whitespace and converted text to uppercase.



## 5. Data Integrity and Validity Checks

In this step, I have performed essential checks to ensure the dataset's quality and reliability before analysis, including:

- Confirming the overall shape and structure of the dataset  
- Identifying any duplicate records  
- Summarizing missing values and their prevalence  
- Verifying data types for each column  
- Reviewing unique values in categorical columns to detect anomalies or typos  
- Validating that key columns, such as `fraud_reported`, contain only expected values `"Y" or "N"`

In [8]:
# Data Integrity Check
print("Initial Integrity Checks")
print("Dataset shape:", df.shape)

# 1. Column names and data types
print("\nColumn Names and Data Types:")
print(df.dtypes)

# 2. Check for duplicate rows
duplicates = df.duplicated().sum()
print(f"\nDuplicate rows: {duplicates}")

# 3. Check for missing values
missing = df.isnull().sum()
missing_percent = (missing / len(df) * 100).round(2)
print("\nMissing Values:")
print(pd.DataFrame({'Missing Count': missing, 'Percent': missing_percent}).loc[missing > 0])

# 4. Unique values in categorical columns
cat_cols = df.select_dtypes(include='object').columns.tolist()
print("\nUnique values in categorical columns:")
for col in cat_cols:
    print(f"{col}: {df[col].nunique()} unique values")
    print(f"  → {df[col].unique()[:5]} ...")  # Display sample of unique values to detect any irregular categories

# 5. Check for expected column presence
expected_cols = ['fraud_reported', 'incident_type', 'total_claim_amount']
missing_cols = [col for col in expected_cols if col not in df.columns]
if missing_cols:
    print(f"\nMissing expected columns: {missing_cols}")
else:
    print("\nAll expected columns are present.")

Initial Integrity Checks
Dataset shape: (1000, 40)

Column Names and Data Types:
months_as_customer               int64
age                              int64
policy_number                    int64
policy_bind_date                object
policy_state                    object
policy_csl                      object
policy_deductable                int64
policy_annual_premium          float64
umbrella_limit                   int64
insured_zip                      int64
insured_sex                     object
insured_education_level         object
insured_occupation              object
insured_hobbies                 object
insured_relationship            object
capital-gains                    int64
capital-loss                     int64
incident_date                   object
incident_type                   object
collision_type                  object
incident_severity               object
authorities_contacted           object
incident_state                  object
incident_city         

In [9]:
# Validate that 'fraud_reported' contains only expected values ('Y' or 'N') to prevent downstream errors
if 'fraud_reported' in df.columns:
    valid_fraud_values = set(df['fraud_reported'].dropna().unique())
    if not valid_fraud_values.issubset({'Y', 'N'}):
        print(f"Invalid values found in 'fraud_reported': {valid_fraud_values}")
    else:
        print("'fraud_reported' column contains only 'Y' and 'N'")

'fraud_reported' column contains only 'Y' and 'N'


## 6. Explore and Inspect Missing Data

Before I clean the dataset, I need to identify which columns contain missing values and how many.  
This helps me decide whether to **fill**, **drop**, or **investigate further** depending on the importance of each column.

In [10]:
# Count missing values for each column
missing = df.isnull().sum()

# Filter to show only columns that have missing values, sorted from most to least
missing = missing[missing > 0].sort_values(ascending=False)

# Create a DataFrame summarizing both the count and percentage of missing values
missing_df = pd.DataFrame({
    'Missing Count': missing,
    'Missing %': (missing / len(df) * 100).round(2)
})

# Display the summary table of missing data
print("Columns with missing values:")
print(missing_df)

Columns with missing values:
      Missing Count  Missing %
_c39           1000      100.0


## 7. Detect and Clean Leading/Trailing Spaces in String Columns

String data can sometimes contain unwanted leading or trailing spaces that cause inconsistencies in analysis and modeling.  
This step performs the following actions:

1. **Detect** any entries in string columns that have leading or trailing whitespace and display examples to help verify the extent of the issue.
2. **Clean** all string columns by stripping these spaces to ensure uniformity.
3. **Verify** the cleanup by displaying sample cleaned values for confirmation.

In [11]:
# Print a message to indicate the start of the process
print("Checking for leading/trailing spaces in string columns...\n")

# Loop through all columns that are of string (object) type
for col in df.select_dtypes(include='object'):
    # Identify entries with leading or trailing whitespace to spot data inconsistencies
    whitespace_mask = df[col].str.match(r'^\s+|\s+$', na=False)
    count = whitespace_mask.sum()  # Count how many entries are affected

    # If this column has entries with leading/trailing spaces, show samples
    if count > 0:
        print(f"Column '{col}' has {count} entries with leading/trailing spaces.")
        print("   Sample problematic values:", df.loc[whitespace_mask, col].unique()[:5])
    else:
        print(f"Column '{col}' is clean (no leading/trailing spaces).")

# Inform when the cleanup is beginning
print("\nCleaning all string columns by stripping leading and trailing spaces...\n")

# Remove unwanted whitespace from all string columns to ensure uniformity
for col in df.select_dtypes(include='object'):
    df[col] = df[col].str.strip()

# Confirm cleanup is complete
print("Cleanup complete. Sample cleaned values for each string column:")

# Show 5 sample cleaned values from each string column
for col in df.select_dtypes(include='object'):
    print(f"{col}: {df[col].unique()[:5]}")

Checking for leading/trailing spaces in string columns...

Column 'policy_bind_date' is clean (no leading/trailing spaces).
Column 'policy_state' is clean (no leading/trailing spaces).
Column 'policy_csl' is clean (no leading/trailing spaces).
Column 'insured_sex' is clean (no leading/trailing spaces).
Column 'insured_education_level' is clean (no leading/trailing spaces).
Column 'insured_occupation' is clean (no leading/trailing spaces).
Column 'insured_hobbies' is clean (no leading/trailing spaces).
Column 'insured_relationship' is clean (no leading/trailing spaces).
Column 'incident_date' is clean (no leading/trailing spaces).
Column 'incident_type' is clean (no leading/trailing spaces).
Column 'collision_type' is clean (no leading/trailing spaces).
Column 'incident_severity' is clean (no leading/trailing spaces).
Column 'authorities_contacted' is clean (no leading/trailing spaces).
Column 'incident_state' is clean (no leading/trailing spaces).
Column 'incident_city' is clean (no le

## 8. Clean the Data + Create Missing Value Flags

In this step, I improve the data quality and prepare the dataset for downstream analysis and modeling by performing several critical actions:

### Key Steps:
- **Drop empty columns**: Remove any columns that are completely missing (all values are `NaN`), as they provide no useful information.
- **Create binary missing flags**: For selected categorical columns (e.g. `collision_type`, `police_report_available`, etc.), generate `_missing_flag` columns that indicate if the original value was missing.  
  This captures potential *informative missingness* — i.e., missing values that may correlate with fraud.
- **Impute non-critical missing values**: Fill missing values in lower-priority columns like `police_report_available` with a default (`'NO'`) to avoid unnecessary row drops.
- **Drop rows missing critical data**: Remove rows that are missing essential categorical features such as `incident_type` or `collision_type`, since these are needed for key analyses.
- **Parse `policy_csl` coverage limits**: This column contains strings like `"100/300"` that represent min/max coverage values. I split this into two new numeric columns:
  - `policy_csl_min`: Minimum liability coverage  
  - `policy_csl_max`: Maximum liability coverage  
  Robust error handling is included to catch and report parsing issues.

By combining all these actions into a single cleanup step, I ensure the dataset is both **analytically reliable** and **ready for modeling** — without losing valuable patterns hidden in missing data.

In [12]:
# Remove any columns that contain no data at all
df.dropna(axis=1, how='all', inplace=True)

# Impute missing values in 'police_report_available' with 'NO' to retain rows
df.fillna({'police_report_available': 'NO'}, inplace=True)

# Store original number of rows before dropping critical missing values
initial_rows = df.shape[0]

# Create binary flags to capture missing values in selected key fields
flag_features = ['collision_type', 'police_report_available', 'property_damage', 'authorities_contacted']
for col in flag_features:
    df[f'{col}_missing_flag'] = df[col].isna().astype(int)

# Drop rows missing values in critical categorical features
df.dropna(subset=['incident_type', 'collision_type'], inplace=True)

# Calculate and report how many rows were dropped
dropped_rows = initial_rows - df.shape[0]
print(f"Rows after dropping missing critical categories: {df.shape[0]} (dropped {dropped_rows} rows)")

# Define a helper function to parse 'policy_csl' coverage limits stored as strings like "100/300"
def parse_policy_csl(csl_str):
    try:
        parts = csl_str.split('/')
        return int(parts[0]), int(parts[1])
    except Exception as e:
        print(f"Warning: could not parse policy_csl value '{csl_str}': {e}")
        return (pd.NA, pd.NA)
    
# Apply parsing function to create separate numeric columns for min and max coverage limits
if 'policy_csl' in df.columns:
    csl_parsed = df['policy_csl'].apply(lambda x: parse_policy_csl(x) if pd.notna(x) else (pd.NA, pd.NA))
    df[['policy_csl_min', 'policy_csl_max']] = pd.DataFrame(csl_parsed.tolist(), index=df.index)
    print("Parsed 'policy_csl' into 'policy_csl_min' and 'policy_csl_max'.")

    # Display sample of parsed coverage limits alongside original values for verification
    print("\nSample of 'policy_csl' parsing results:")
    print(df[['policy_csl', 'policy_csl_min', 'policy_csl_max']].head())

# Display new shape of cleaned dataset after these operations
print("Cleaned data shape:", df.shape)

Rows after dropping missing critical categories: 1000 (dropped 0 rows)
Parsed 'policy_csl' into 'policy_csl_min' and 'policy_csl_max'.

Sample of 'policy_csl' parsing results:
  policy_csl  policy_csl_min  policy_csl_max
0    250/500             250             500
1    250/500             250             500
2    100/300             100             300
3    250/500             250             500
4   500/1000             500            1000
Cleaned data shape: (1000, 45)


## 9. Encode & Format Columns

To make the dataset machine readable, I now convert string based fields into consistent numerical and date formats:

- Convert `fraud_reported` from `"Y"/"N"` to binary `1/0`
- Format `incident_date` as a proper datetime object for time based analysis


In [13]:
# Convert 'fraud_reported' from categorical strings ('Y'/'N') to binary numeric values (1/0)
df['fraud_reported'] = df['fraud_reported'].map({'Y': 1, 'N': 0})
df['fraud_reported'] = df['fraud_reported'].fillna(0).astype(int)

# Convert date columns to proper datetime objects for temporal analysis and sorting
if 'incident_date' in df.columns:
    df['incident_date'] = pd.to_datetime(df['incident_date'], errors='coerce')

# Convert 'policy_bind_date' column to datetime format
if 'policy_bind_date' in df.columns:
    df['policy_bind_date'] = pd.to_datetime(df['policy_bind_date'], errors='coerce')
    print(f"Converted 'policy_bind_date' to datetime with {df['policy_bind_date'].isna().sum()} missing values after conversion.")

Converted 'policy_bind_date' to datetime with 0 missing values after conversion.


## 10. Create a Risk Score Feature

I Introduced a simple rule based `risk_score` to help identify potentially fraudulent claims.  
Each claim earns a "risk point" for meeting one of the following high-risk conditions:

- **High claim amount** (greater than \$10,000)
- **Unusual incident time** (between 12:00 AM and 5:00 AM)
- **No police report submitted**

The final score ranges from `0 = (low risk)` to `3 = (high risk)`.

In [14]:
# Create a basic risk score based on three binary risk conditions
df['risk_score'] = (
    (df['total_claim_amount'] > 10000).astype(int) +                    # 1 point if claim amount exceeds $10,000
    (df['incident_hour_of_the_day'].between(0, 5)).astype(int) +        # 1 point if incident occurred between midnight and 5AM
    (df['police_report_available'].str.upper() == 'NO').astype(int)     # 1 point if no police report was filed
)

# Preview relevant fields used in the risk score calculation
df[['total_claim_amount', 'incident_hour_of_the_day', 'police_report_available', 'risk_score']].head()

Unnamed: 0,total_claim_amount,incident_hour_of_the_day,police_report_available,risk_score
0,71610,5,YES,2
1,5070,8,,0
2,34650,7,NO,2
3,63400,5,NO,3
4,6500,20,NO,1


## 11. Final Sanity Check Before Exporting

Before saving and exporting the cleaned dataset, I perform a final sanity check to verify:

- The overall size of the dataset (number of rows and columns)
- The total number of fraud cases (`fraud_reported = 1`)
- The total number of non-fraud cases (`fraud_reported = 0`)

This helps ensure the data encoding and filtering steps have worked correctly and that the dataset is ready for downstream processes.

In [15]:
# Final sanity check before saving/exporting
print("Final dataset shape:", df.shape)
print("Number of fraud cases:", df['fraud_reported'].sum())
print("Number of non-fraud cases:", (df['fraud_reported'] == 0).sum())

Final dataset shape: (1000, 46)
Number of fraud cases: 247
Number of non-fraud cases: 753


## 12. Save the Cleaned Dataset

Now that I've cleaned and preprocessed the data, I'll save it to a **CSV** file.

Saving a copy ensures:
- You don’t need to re-clean the raw dataset every time.
- The data can be reused for **EDA**, modeling, or dashboarding.
- It creates an auditable snapshot of your cleaned version.

In [16]:
# Save the cleaned DataFrame to a CSV file
df.to_csv(cleaned_file, index=False)

# Confirm the saved file location
print(f"Cleaned dataset saved to:\n{cleaned_file}")

Cleaned dataset saved to:
C:\Users\Cloud\OneDrive\Desktop\Fraud_Analytics_Project\data\cleaned\cleaned_insurance_claims.csv


## 13. Load Cleaned Data into PostgreSQL

After saving the cleaned dataset locally, I load it into a **PostgreSQL** table for further analysis and querying.

This step enables:
- Centralized access to the data via **SQL** tools (e.g., `pgAdmin`, `Tableau`)
- Easier integration with reporting dashboards
- Scalable querying of clean data without relying on raw **CSVs**

The table will be named: `insurance_claims`.
If it already exists, it will be **replaced**.

In [17]:
# Load the cleaned DataFrame into a PostgreSQL table database (my table name is 'insurance_claims')
try:
    df.to_sql("insurance_claims", engine, if_exists="replace", index=False)
    print("Data loaded into PostgreSQL table: 'insurance_claims'")
except Exception as e:
    print(f"Error loading data into PostgreSQL: {e}")

Data loaded into PostgreSQL table: 'insurance_claims'


## 14. Query Fraud vs Non-Fraud Counts

To validate that the data was successfully inserted and to get a quick summary of fraud distribution,  
I will run a simple **SQL** query to count how many claims were reported as fraud (`1`) vs not fraud (`0`).

This helps:
- Verify correct encoding of the `fraud_reported` field
- Understand basic class balance before modeling
- Confirm that data loaded into **PostgreSQL** is accessible and queryable

In [18]:
# SQL query to count the number of claims by fraud label
query = """
SELECT fraud_reported, COUNT(*) as count
FROM insurance_claims
GROUP BY fraud_reported;
"""

# Execute the query and display results
fraud_summary = pd.read_sql(query, engine)
fraud_summary

Unnamed: 0,fraud_reported,count
0,0,753
1,1,247


# ETL Summary

- Raw insurance claim data was cleaned, standardized, and validated  
- Placeholder values and inconsistencies were handled  
- Missing value flags were created for downstream analysis  
- The cleaned dataset was exported locally as a CSV  
- Data was loaded into a PostgreSQL database (`insurance_claims` table via pgAdmin4)  
- A fraud distribution validation query confirmed successful ingestion

---

## Next Steps

- Perform Exploratory Data Analysis in [`eda.ipynb`](./eda.ipynb) to understand fraud patterns  
- Engineer predictive features in [`feature_engineering.ipynb`](./feature_engineering.ipynb)  
- Train machine learning models in [`model_training.ipynb`](./model_training.ipynb)  
- Visualize insights and fraud risks using Tableau in `reporting_dashboard.ipynb`

---

This ETL process forms the foundation of the fraud analytics pipeline by ensuring the dataset is high quality, structured, and ready for in depth analysis and machine learning.