import important library

In [1]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

Function For loading and pre_processing Data

In [2]:
def load_and_preprocess_data(file_path):
    # Load the data
    if file_path.endswith('.csv'):
        df = pd.read_csv(file_path)
    elif file_path.endswith('.xlsx'):
        df = pd.read_excel(file_path)
    else:
        raise ValueError("Unsupported file format. Please use .csv or .xlsx")
    
    # Convert date to datetime
    df['date'] = pd.to_datetime(df['date'])
    
    # Handle missing data
    df = df.dropna()  # Remove rows with missing data
    
    # Convert amount to float
    df['amount'] = df['amount'].astype(float)
    
    return df

In [3]:
def calculate_statistics(df):
    stats = df.groupby('category')['amount'].agg(['mean', 'median', 'std'])
    print(df)
    return stats

In [4]:
def calculate_thresholds(stats, z_score=3):
    thresholds = stats.copy()
    thresholds['upper_threshold'] = thresholds['mean'] + z_score * thresholds['std']
    thresholds['lower_threshold'] = thresholds['mean'] - z_score * thresholds['std']
    return thresholds


The function detect_amount_anomalies identifies outliers in transaction amounts per category using z-scores. It iterates through each category, calculates mean and standard deviation excluding each transaction iteratively, computes z-scores, flags anomalies exceeding a threshold, and returns a DataFrame listing anomaly details.

In [5]:
def detect_amount_anomalies(df, z_score_threshold=3):
    anomalies = []
    
    for category in df['category'].unique():
        category_df = df[df['category'] == category]
        
        for idx, row in category_df.iterrows():
            # Exclude the current transaction
            other_transactions = category_df.drop(idx)
            
            # Calculate mean and std without the current transaction
            mean = other_transactions['amount'].mean()
            std = other_transactions['amount'].std()
            
            # Calculate z-score for the current transaction
            z_score = (row['amount'] - mean) / std if std != 0 else 0
            
            if abs(z_score) > z_score_threshold:
                anomalies.append({
                    'transaction_id': row['transaction_id'],
                    'date': row['date'],
                    'category': category,
                    'amount': row['amount'],
                    'reason_for_anomaly': f"Amount anomaly (z-score: {z_score:.2f})"
                })
    
    return pd.DataFrame(anomalies)

The function detect_frequency_anomalies identifies sudden increases in transaction frequency per category within a DataFrame df. It sorts the DataFrame by date, calculates transaction counts over a rolling window, computes average counts, and flags anomalies where counts exceed a threshold relative to the average. Anomalies are logged with transaction details and returned as a DataFrame.

In [6]:
def detect_frequency_anomalies(df, window=3, threshold_multiplier=2):
    df_sorted = df.sort_values('date')
    frequency_anomalies = []
    
    for category in df['category'].unique():
        category_df = df_sorted[df_sorted['category'] == category].copy()
        category_df.loc[:, 'transaction_count'] = category_df['date'].rolling(window, min_periods=0).count()
        category_df.loc[:, 'avg_transaction_count'] = category_df['transaction_count'].rolling(window * 2, min_periods=0).mean().shift(1)
        
        for _, row in category_df.iterrows():
            if row['transaction_count'] > threshold_multiplier * row['avg_transaction_count']:
                frequency_anomalies.append({
                    'transaction_id': row['transaction_id'],
                    'date': row['date'],
                    'category': category,
                    'amount': row['amount'],
                    'reason_for_anomaly': f"Sudden frequency increase (count: {row['transaction_count']}, avg: {row['avg_transaction_count']:.2f})"
                })
    
    return pd.DataFrame(frequency_anomalies)

The function detect_pattern_anomalies identifies irregular patterns in transaction amounts per category within a DataFrame df. It sorts the DataFrame by date and computes a moving average and standard deviation of transaction amounts over a specified window. Anomalies are detected when the absolute deviation from the moving average exceeds twice the standard deviation. Anomalies are logged with transaction details and returned as a DataFrame.

In [7]:
def detect_pattern_anomalies(df, window=7):
    pattern_anomalies = []
    df_sorted = df.sort_values('date')
    
    for category in df['category'].unique():
        category_df = df_sorted[df_sorted['category'] == category].copy()
        category_df.loc[:, 'amount_moving_avg'] = category_df['amount'].rolling(window=window, min_periods=0).mean()
        category_df.loc[:, 'amount_std'] = category_df['amount'].rolling(window=window, min_periods=0).std()
        
        for _, row in category_df.iterrows():
            if abs(row['amount'] - row['amount_moving_avg']) > 2 * row['amount_std']:
                pattern_anomalies.append({
                    'transaction_id': row['transaction_id'],
                    'date': row['date'],
                    'category': category,
                    'amount': row['amount'],
                    'reason_for_anomaly': f"Irregular pattern (amount: {row['amount']:.2f}, moving avg: {row['amount_moving_avg']:.2f}, std: {row['amount_std']:.2f})"
                })
    
    return pd.DataFrame(pattern_anomalies)

function for detailed report

In [8]:
def generate_report(anomalies):
    print("Anomaly Detection Report")
    print("========================")
    print(f"Total anomalies detected: {len(anomalies)}")
    
    print("\nAnomalies by Type:")
    print(anomalies['reason_for_anomaly'].value_counts().to_string())
    
    print("\nAnomalies by Category:")
    print(anomalies['category'].value_counts().to_string())
    
    print("\nDetailed Anomalies:")
    print(anomalies.to_string(index=False))

main function is to execute the above function 

In [9]:
def main(file_path):
    # 1. Data Preprocessing
    df = load_and_preprocess_data(file_path)
    
    # 2. Anomaly Detection
    amount_anomalies = detect_amount_anomalies(df)  # Using default z_score_threshold of 3
    frequency_anomalies = detect_frequency_anomalies(df)
    pattern_anomalies = detect_pattern_anomalies(df)
    
    # Combine all anomalies
    all_anomalies = pd.concat([amount_anomalies, frequency_anomalies, pattern_anomalies], ignore_index=True)
    
    if all_anomalies.empty:
        print("No anomalies detected.")
    else:
        # Sort anomalies by date if the date column exists
        if 'date' in all_anomalies.columns:
            all_anomalies = all_anomalies.sort_values('date')
        
        # 3. Reporting
        generate_report(all_anomalies)



# store dataset path in file_path

In [10]:
file_path ="transaction.xlsx"


execute with the above path

In [11]:
main(file_path)

Anomaly Detection Report
Total anomalies detected: 1

Anomalies by Type:
Amount anomaly (z-score: 842.16)    1

Anomalies by Category:
Food    1

Detailed Anomalies:
transaction_id       date category  amount               reason_for_anomaly
        TRX004 2024-06-02     Food  3000.0 Amount anomaly (z-score: 842.16)
