### The Goal of this Project is to create a Fraud Detection Pipeline That follows certain Rule (Client Requirements)
#### 1. Transactions that are dated before 2000 are highly likely to be a Fraud transaction.
#### 2. Bulk Transactions that are more than 500 (Quantity) are highly likely to be a Fraud Transaction.
#### 3. Transactions that costs 0 but tagged as Complete in Status Column are highly likely to be a Fraud Transaction.
#### 4. Transactions that are tagged as "Unknown" in Payment method are highly likely to be a Fraud transaction.
#### 5. Transactions with "Unknown" status in (transaction_status) column are highly likely to be a Fraud transaction.

In [2]:
import pandas as pd
import numpy as np
from datetime import datetime
from datetime import date
from dotenv import load_dotenv
from sqlalchemy import create_engine
from sqlalchemy import text
import os
import csv
import logging

In [3]:
load_dotenv()

server = os.getenv('DB_SERVER')
database = os.getenv('DB_NAME')
driver = os.getenv('DB_DRIVER').replace(' ', '+')

master_conn_str = f"mssql+pyodbc://{server}/master?trusted_connection=yes&driver={driver}"
engine_master = create_engine(master_conn_str, isolation_level="AUTOCOMMIT")

with engine_master.connect() as conn:
    conn.execute(text(f"IF DB_ID(N'{database}') IS NULL CREATE DATABASE [{database}]"))
    print(f"Database '{database}' ensured to exist.")

target_conn_str = f"mssql+pyodbc://{server}/{database}?trusted_connection=yes&driver={driver}"
engine = create_engine(target_conn_str)

try:
    with engine.connect() as conn:
        print("Connected to SQL Server Successfully!")
        conn.close()
except Exception as e:
    print("Error Connecting to SQL Server:", e)
    raise

Database 'DB_Financial_Transaction' ensured to exist.
Connected to SQL Server Successfully!


### Logging Functions

In [4]:
LOG_PATH = "pipeline.log"
logging.basicConfig(
    filename=LOG_PATH,
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

In [5]:
def detect_delimiter(file, sample_size=1024):
    with open(file, "r", encoding='utf-8') as f:
        sample = f.read(sample_size)
        sniffer = csv.Sniffer()
        try:
            dialect = sniffer.sniff(sample)
            return dialect.delimiter
        except csv.Error:
            return ","

In [6]:
def read_files(file):
    try:
        ext = os.path.splitext(file)[1].lower()

        if ext in ['.xlsx', '.xls']:
            df = pd.read_excel(file)
        elif ext == '.csv':
            delimiter = detect_delimiter(file)
            # print(f"Detected delimiter for {file}: {repr(delimiter)}")
            df = pd.read_csv(file, delimiter=delimiter)
        else:
            print(f"Unsupported file: {file}")
            return None
    
        if df is None or df.empty:
            print("File is empty or constains no Data")

        df.columns = df.columns.str.strip().str.lower()
        return df

    except Exception as e:
        print(f"Error reading file '{file}': {e}")
        return pd.DataFrame()

In [7]:
date_formats = [
    "%Y-%m-%d",    # 2025-10-13
    "%Y-%d-%m",    # 2025-13-10
    "%m-%d-%Y",    # 10-13-2025
    "%d-%m-%Y",    # 13-10-2025
    "%b %d, %Y",   # Oct 13, 2025
    "%B %d, %Y"    # October 13, 2025
]

In [8]:
def try_parse_date(date_str):
    if pd.isna(date_str):
        return pd.to_datetime("1900-01-01")
    for fmt in date_formats:
        try:
            return datetime.strptime(date_str.strip(), fmt)
        except (ValueError, TypeError):
            continue
    return pd.to_datetime("1900-01-01")

In [9]:
valid_columns = {
    'transaction_id',
    'transaction_date',
    'customer_id',
    'product_name',
    'quantity',
    'price',
    'payment_method',
    'transaction_status'
}

valid_product = ['Headphones', 'Coffee Machine', 'Smartphone', 'Laptop', 'Tablet']
valid_payment_methods = ['Paypal', 'Creditcard', 'Cash']
valid_transaction_status = ['Pending', 'Completed', 'Failed']
column_dict = ['transaction_id', 'product_name', 'quantity', 'price', 'total_sales', 'customer_id', 'payment_method', 'transaction_date', 'transaction_status']

In [10]:
mapping = {
    "C": "Coffee Machine",
    "Co": "Coffee Machine",
    "Cof": "Coffee Machine",
    "Coff": "Coffee Machine",
    "Coffe": "Coffee Machine",
    "Coffee": "Coffee Machine",
    "Coffee ": "Coffee Machine",
    "Coffee M": "Coffee Machine",
    "Coffee Ma": "Coffee Machine",
    "Coffee Mac": "Coffee Machine",
    "Coffee Mach": "Coffee Machine",
    "Coffee Machi": "Coffee Machine",
    "Coffee Machin": "Coffee Machine",
    "H": "Headphones",
    "He": "Headphones",
    "Hea": "Headphones",
    "Head": "Headphones",
    "Headp": "Headphones",
    "Headph": "Headphones",
    "Headpho": "Headphones",
    "Headphon": "Headphones",
    "Headphone": "Headphones",
    "T": "Tablet",
    "Ta": "Tablet",
    "Tab": "Tablet",
    "Tabl": "Tablet",
    "Table": "Tablet",
    "S": "Smartphone",
    "Sm": "Smartphone",
    "Sma": "Smartphone",
    "Smar": "Smartphone",
    "Smart": "Smartphone",
    "Smartp": "Smartphone",
    "Smartph": "Smartphone",
    "Smartpho": "Smartphone",
    "Smartphon": "Smartphone",
    "L": "Laptop",
    "La": "Laptop",
    "Lap": "Laptop",
    "Lapt": "Laptop",
    "Lapto": "Laptop",
}

In [11]:
def cleaning_product_name(product_name_col):
    product_name_col = product_name_col.str.replace(r'[^A-Za-z ]', '', regex=True)
    product_name_col = product_name_col.str.title()
    product_name_col = product_name_col.replace(mapping)
    # product_name_col = product_name_col.fillna('Unknown')
    product_name_col = product_name_col.where(product_name_col.isin(valid_product), 'Unknown')
    return product_name_col

In [12]:
def cleaning_quantity(item_quantity):
    item_quantity = pd.to_numeric(item_quantity, errors='coerce')
    item_quantity = item_quantity.fillna(0)
    item_quantity = item_quantity.abs()
    item_quantity = item_quantity.astype(int)
    return item_quantity

In [13]:
def cleaning_price(price_col):
    price_col = price_col.astype(str).str.replace(r'[^0-9.]', '', regex=True)
    price_col = pd.to_numeric(price_col, errors='coerce')
    price_col = price_col.fillna(0)
    price_col = price_col.round(2).abs()
    # price_col = '$' + price_col.astype(str)
    return price_col

In [14]:
def cleaning_payment(payment_col):
    payment_col = payment_col.str.lower().str.replace(' ', '').str.title()
    # payment_col = payment_col.fillna('Unknown')
    payment_col = payment_col.where(payment_col.isin(valid_payment_methods), 'Unknown')
    return payment_col

In [15]:
def cleaning_transaction_status(transaction_status_col):
    transaction_status_col = transaction_status_col.str.lower().str.replace(' ', '').str.strip().str.title()
    transaction_status_col = transaction_status_col.replace(to_replace=r'.*Complete.*', value='Completed', regex=True)
    # transaction_status_col = transaction_status_col.fillna('Unknown')
    transaction_status_col = transaction_status_col.where(transaction_status_col.isin(valid_transaction_status), 'Unknown')
    return transaction_status_col

In [16]:
def fraud_detection(df):
    #Rule-based Fraud Detection
    df['rule_suspicious_product_name'] = ~df['product_name'].isin(valid_product)
    df['rule_suspicious_date'] = df['transaction_date'] < "2000-01-01"
    df['rule_bulk_quantity'] = df['quantity'] > 500
    df['rule_zero_price_completed'] = (df['price'] == 0) & (df['transaction_status'] == "Completed")
    df['rule_unknown_payment'] = df['payment_method'] == 'Unknown'
    df['rule_unknown_status'] = df['transaction_status'] == 'Unknown'

    df['fraud_score'] = (
        df['rule_suspicious_product_name'] * 20 +
        df['rule_suspicious_date'] * 25 +
        df['rule_bulk_quantity'] * 5 +
        df['rule_zero_price_completed'] * 20 +
        df['rule_unknown_payment'] * 15 +
        df['rule_unknown_status'] * 15
    ) 

    df['possible_fraud'] = df['fraud_score'] >= 20
    return df

In [25]:
def data_cleaning(df):
    try:
    # Initial Column Cleaning and Sorting
        df = df.dropna(subset=['transaction_id', 'customer_id'])
        df = df.drop_duplicates(subset=['transaction_id'])
        df = df.sort_values('transaction_id', ascending=True)

        # Fix transaction_date format
        df['transaction_date'] = df['transaction_date'].str.strip()
        df['transaction_date'] = df['transaction_date'].str.replace(r"[/.-]", "-", regex=True)
        df['transaction_date'] = df['transaction_date'].apply(try_parse_date)
        df['transaction_date'] = df['transaction_date'].dt.strftime('%Y-%m-%d')
        
        # More Column Cleaning
        df['product_name'] = cleaning_product_name(df['product_name'])
        df['quantity'] = cleaning_quantity(df['quantity'])
        df['price'] = cleaning_price(df['price'])
        df['payment_method'] = cleaning_payment(df['payment_method'])
        df['transaction_status'] = cleaning_transaction_status(df['transaction_status'])

        df['total_sales'] = df['quantity'] * df['price']
        
        df = df[column_dict]
        return df
    
    except Exception as e:
        print(f"Missing important Columns: {e}")
        return pd.DataFrame()

In [18]:
def save_file(df, folder, prefix):
    os.makedirs(folder, exist_ok=True)
    timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    path = f"{folder}/{prefix}_{timestamp}.csv"
    df.to_csv(path, index=False)
    return path

In [None]:
def run_pipeline(file):
    logging.info(f"üöÄ Starting pipeline!")

    df_raw = read_files(file)
    if df_raw.empty:
        logging.error("File is empty or failed to load")
        return
    
    known, unknown = (
        df_raw[list(valid_columns)],
        df_raw[[c for c in df_raw.columns if c not in valid_columns]]
    )

    if not unknown.empty:
        save_file(unknown, "Review_Files", "Columns_To_Review")

    df_clean = data_cleaning(known)
    df_final = fraud_detection(df_clean)

    final_path = save_file(df_final, "Final_Files", "Final_Cleaned_Fraud_Data")

    logging.info(f"Pipeline complete ‚Üí Saved to {final_path}")
    print(f"‚úî Pipeline completed successfully.\nüìÅ Final File: {final_path}")

In [None]:
if __name__ == "__main__":
    run_pipeline("../DataSet/dirty_financial_transactions.csv")