In [None]:
import pandas as pd
from datetime import datetime, timedelta

def calculate_thresholds(data):
    mean = data["amount"].mean()
    std_dev = data["amount"].std()
    q1 = data["amount"].quantile(0.25)
    q3 = data["amount"].quantile(0.75)
    iqr = q3 - q1
    return {
        "mean": mean,
        "std_dev": std_dev,
        "z_score_threshold": mean + 3 * std_dev,  # 3 standard deviations above the mean
        "iqr_upper_threshold": q3 + 1.5 * iqr,   # 1.5 IQRs above the third quartile
    }

def calculate_mean_frequency(daily_category_freq):
    mean_freq = daily_category_freq.groupby("category").mean()
    return mean_freq

def is_zscore_outlier(transaction, category_data):
    return transaction["amount"] > category_data["z_score_threshold"]

def is_high_amount_outlier(transaction, category_data):
    return transaction["amount"] > 2 * category_data["mean"]  # Adjust multiplier as needed

def is_iqr_outlier(transaction, category_data):
    return transaction["amount"] > category_data["iqr_upper_threshold"]

def get_historical_transactions(transactions, category, date, window=7):
    """
    This function retrieves historical transactions for a given category within a specified window.

    Args:
        transactions (pd.DataFrame): DataFrame containing transaction data.
        category (str): Transaction category to retrieve historical data for.
        date (datetime): Date of the transaction to look back from.
        window (int, optional): Window size in days to look back (default: 7).

    Returns:
        pd.DataFrame: DataFrame containing historical transactions for the category.
    """
    start_date = date - timedelta(days=window)
    return transactions.loc[(transactions["category"] == category) & (transactions["date"] >= start_date) & (transactions["date"] < date)]

def is_time_series_outlier(transaction, transactions, window_size=7):
    """
    This function checks for transaction anomalies based on deviation from the moving average within a category.

    Args:
        transaction (dict): Transaction data with category and amount.
        transactions (pd.DataFrame): DataFrame containing all transaction data.
        window_size (int): Rolling window size for moving average calculation (default=7).

    Returns:
        bool: True if transaction is an anomaly, False otherwise.
    """
    category = transaction["category"]
    date = transaction["date"]

    # Access historical transactions for the category
    category_history = get_historical_transactions(transactions, category, date, window_size)

    # Calculate moving average and standard deviation for the window
    if not category_history.empty:
        rolling_average = category_history["amount"].mean()
        rolling_std_dev = category_history["amount"].std()

        # Check if transaction amount deviates significantly from the moving average
        threshold = rolling_average + 2 * rolling_std_dev  # Adjust threshold as needed
        return transaction["amount"] > threshold
    return False

def detect_anomalies(data_file):
    """
    This function reads transaction data, detects anomalies based on amount, frequency,
    and irregular patterns (implement later), and generates a report.

    Args:
        data_file (str): Path to the CSV file containing transaction data.

    Returns:
        None
    """
    transactions = pd.read_csv(data_file)

    # Convert date column to datetime format
    transactions["date"] = pd.to_datetime(transactions["date"])

    # Group transactions by category and calculate statistics
    category_stats = transactions.groupby("category").apply(calculate_thresholds).to_dict()

    # Calculate daily transaction frequency by category
    daily_category_freq = transactions.groupby(["date", "category"]).size().to_frame(name="count").reset_index()
    mean_category_freq = calculate_mean_frequency(daily_category_freq)

    # Detect anomalies and create report data
    anomaly_report = []
    for index, row in transactions.iterrows():
        category_data = category_stats[row["category"]]
        daily_freq = daily_category_freq[(daily_category_freq["date"] == row["date"]) & (daily_category_freq["category"] == row["category"])]["count"].values[0]
        mean_freq = mean_category_freq.loc[row["category"]]["count"]

        if daily_freq > mean_freq:
            anomaly_report.append(
                {
                    "transaction_id": row["transaction_id"],
                    "date": row["date"].strftime("%Y-%m-%d"),
                    "category": row["category"],
                    "amount": row["amount"],
                    "reason_for_anomaly": "High frequency anomaly",
                    "category_anomaly": True,
                }
            )
        elif is_zscore_outlier(row, category_data):
            anomaly_report.append(
                {
                    "transaction_id": row["transaction_id"],
                    "date": row["date"].strftime("%Y-%m-%d"),
                    "category": row["category"],
                    "amount": row["amount"],
                    "reason_for_anomaly": "Z-score anomaly",
                    "category_anomaly": False,
                }
            )
        elif is_high_amount_outlier(row, category_data):
            anomaly_report.append(
                {
                    "transaction_id": row["transaction_id"],
                    "date": row["date"].strftime("%Y-%m-%d"),
                    "category": row["category"],
                    "amount": row["amount"],
                    "reason_for_anomaly": "Unusually high transaction amount",
                    "category_anomaly": False,
                }
            )
        elif is_iqr_outlier(row, category_data):
            anomaly_report.append(
                {
                    "transaction_id": row["transaction_id"],
                    "date": row["date"].strftime("%Y-%m-%d"),
                    "category": row["category"],
                    "amount": row["amount"],
                    "reason_for_anomaly": "IQR anomaly",
                    "category_anomaly": False,
                }
            )
        elif is_time_series_outlier(row, transactions):
            anomaly_report.append(
                {
                    "transaction_id": row["transaction_id"],
                    "date": row["date"].strftime("%Y-%m-%d"),
                    "category": row["category"],
                    "amount": row["amount"],
                    "reason_for_anomaly": "Time series anomaly",
                    "category_anomaly": False,
                }
            )
        else:
            # No anomaly detected for this transaction
            pass

    # Print anomaly report
    if anomaly_report:
        print("Anomaly Report:")
        for anomaly in anomaly_report:
            print(anomaly)
    else:
        print("No anomalies detected.")

# Assuming the CSV file path is correct
detect_anomalies('/content/dummy_transactions.csv')
