In [3]:
import numpy as np
import pandas as pd

In [4]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
df = pd.read_csv("/content/gdrive/MyDrive/CodeShastra/Hackathon Dataset.csv")

In [6]:
def detect_anomalies(row):
    anomalies = []

    # Unauthorized Discount Check
    if row['Discount'] > 0:
        if row['Discount'] > (0.20 * row['Sub_Total']) and row['Assign_To'] not in ["Manager", "SAFIK K"]:
            anomalies.append("Unauthorized_Discount")

    # Tax Miscalculation Check
    expected_tax = (row['CGST_Amount'] + row['SGST_Amount'] + row['VAT_Amount'])
    if abs(row['Tax'] - expected_tax) > 1:  # Allow minor rounding errors
        anomalies.append("Tax_Calculation_Error")

    # Pricing Modification Check
    standard_prices = {"Basmati Rice": 375, "THUMPS UP": 185}  # Example reference prices
    if row['Item_Name'] in standard_prices and row['Price'] != standard_prices[row['Item_Name']]:
        anomalies.append("Price_Modification")

    # Suspicious Transaction Check
    if row['Final_Total'] > 5000 and row['Payment_Type'] == "Cash":
        anomalies.append("High_Value_Cash_Transaction")

    if row['Status'] == "Complimentary" and row['Final_Total'] > 1000:
        anomalies.append("High_Value_Complimentary")

    return ", ".join(anomalies) if anomalies else "Normal"

df['Anomaly_Type'] = df.apply(detect_anomalies, axis=1)

In [7]:
df['Anomaly_Type'].value_counts()

Unnamed: 0_level_0,count
Anomaly_Type,Unnamed: 1_level_1
Tax_Calculation_Error,152505
Normal,48178
"Tax_Calculation_Error, High_Value_Complimentary",1695
"Unauthorized_Discount, Tax_Calculation_Error",1600
High_Value_Complimentary,1523
Unauthorized_Discount,166
"Tax_Calculation_Error, High_Value_Cash_Transaction",111
"High_Value_Cash_Transaction, High_Value_Complimentary",34
"Unauthorized_Discount, Tax_Calculation_Error, High_Value_Cash_Transaction",26
High_Value_Cash_Transaction,18


In [8]:
df['Anomaly_Type'].value_counts().sum()

np.int64(205888)

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
import joblib

# Generate labeled anomaly data (using our previous detection function)
def create_labeled_dataset(df):
    labeled_data = []

    for _, row in df.iterrows():
        anomalies = detect_anomalies(row)

        # Initialize severity to "Low" by default
        severity = "Low"

        for anomaly_type in anomalies.split(", "):  # Split anomalies by comma and space
            if anomaly_type != "Normal":
                # Update severity based on anomaly type
                if anomaly_type in ["High_Value_Cash_Transaction", "High_Value_Complimentary"]:
                    severity = "High"
                elif anomaly_type in ["Unauthorized_Discount", "Price_Modification"]:
                    severity = "Medium"
                elif anomaly_type == "Tax_Calculation_Error":
                    severity = "Low"

                # Construct feature dictionary with severity
                features = {
                    "Final_Total": row["Final_Total"],
                    "Discount_Percentage": (row["Discount"] / row["Sub_Total"]) if row["Sub_Total"] > 0 else 0,
                    "Tax_Discrepancy": abs(row["Tax"] - (row["CGST_Amount"] + row["SGST_Amount"] + row["VAT_Amount"])),
                    "Is_Cash": 1 if row["Payment_Type"] == "Cash" else 0,
                    "Is_Complimentary": 1 if row["Status"] == "Complimentary" else 0,
                    "Price_Variation": abs(row["Price"] - get_standard_price(row["Item_Name"])),
                    "Severity": severity # Assign severity to features dictionary
                }
                labeled_data.append(features)

    return pd.DataFrame(labeled_data)

def get_standard_price(item_name):
    # Reference prices (simplified - should be expanded)
    standard_prices = {
        "Basmati Rice": 375, "THUMPS UP": 185, "Manchow Soup (VEG)": 365,
        "Exotic Stir Fried Vegetable": 795, "Roti": 85
    }
    return standard_prices.get(item_name, 0)

# Create labeled dataset
labeled_df = create_labeled_dataset(df)

In [None]:
# Encode severity labels
severity_mapping = {"Critical": 3, "High": 2, "Medium": 1, "Low": 0}
labeled_df["Severity_Encoded"] = labeled_df["Severity"].map(severity_mapping)

# Select features and target
# Remove 'Fulfillment_Time' as it is not in labeled_df
features = labeled_df[["Final_Total", "Discount_Percentage", "Tax_Discrepancy",
                      "Is_Cash", "Is_Complimentary", "Price_Variation"]]
target = labeled_df["Severity_Encoded"]

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(features, target, test_size=0.2, random_state=42)

In [None]:
# Initialize and train Random Forest Classifier
model = RandomForestClassifier(
    n_estimators=100,
    max_depth=10,
    class_weight="balanced",  # Handle class imbalance
    random_state=42
)

model.fit(X_train, y_train)

# Evaluate
y_pred = model.predict(X_test)
# Get unique class labels from y_test or y_pred
unique_classes = np.unique(y_test)
# Filter target_names to match unique classes
target_names = [name for name, value in severity_mapping.items() if value in unique_classes]
print(classification_report(y_test, y_pred, target_names=target_names))

              precision    recall  f1-score   support

        High       1.00      0.98      0.99     31153
      Medium       0.49      0.99      0.65       365
         Low       0.66      0.99      0.79       732

    accuracy                           0.98     32250
   macro avg       0.72      0.98      0.81     32250
weighted avg       0.99      0.98      0.98     32250



In [None]:
# Save the trained model
joblib.dump(model, "anomaly_severity_classifier.pkl")

# Example prediction function
def predict_severity(transaction_data):
    model = joblib.load("anomaly_severity_classifier.pkl")

    # Prepare features in same order as training
    features = pd.DataFrame([{
        "Final_Total": transaction_data["Final_Total"],
        "Discount_Percentage": transaction_data["Discount"] / transaction_data["Sub_Total"],
        "Tax_Discrepancy": abs(transaction_data["Tax"] - (transaction_data["CGST_Amount"] + transaction_data["SGST_Amount"] + transaction_data["VAT_Amount"])),
        "Is_Cash": 1 if transaction_data["Payment_Type"] == "Cash" else 0,
        "Is_Complimentary": 1 if transaction_data["Status"] == "Complimentary" else 0,
        "Fulfillment_Time": transaction_data["Fulfillment_Time_Min"],
        "Price_Variation": abs(transaction_data["Price"] - get_standard_price(transaction_data["Item_Name"]))
    }])

    prediction = model.predict(features)[0]
    severity_levels = {0: "Low", 1: "Medium", 2: "High", 3: "Critical"}
    return severity_levels[prediction]

In [None]:
# Enhanced anomaly detection with ML severity prediction
def detect_anomalies_with_ml(row):
    anomalies = []

    # First apply rule-based detection
    rule_based_anomalies = detect_anomalies(row)

    # Then predict severity for each
    for anomaly_type, _ in rule_based_anomalies:
        if anomaly_type != "Normal":
            severity = predict_severity(row)
            anomalies.append((anomaly_type, severity))

    return anomalies

In [None]:
# Track model performance over time
def log_performance(X_test, y_test):
    y_pred = model.predict(X_test)
    report = classification_report(y_test, y_pred, output_dict=True)

    # Log to file/database
    with open("model_performance_log.csv", "a") as f:
        f.write(f"{datetime.now()},{report['accuracy']}\n")

# Retrain periodically
def retrain_model(new_data):
    global model
    new_df = pd.read_csv(new_data)
    labeled_df = create_labeled_dataset(new_df)

    X = labeled_df[features.columns]
    y = labeled_df["Severity_Encoded"]

    model.fit(X, y)  # Continue training
    joblib.dump(model, "anomaly_severity_classifier.pkl")