## <u>Anomaly Detection</u>
###### After successfully ingesting the data (inside **PesaTransactionsV1 Lakehouse** ), the project flow is divided into the following main steps.

1) Train the model, test the data for anomalies.
2) Use OpenAI GPT4o-mini to analyze the anomaly level (low/medium/high), along with explanations to the deductions and the message that will be sent to the risk analysts.
3) Connect to Azure blob storage, upload `.txt` files (the text in here is the alert message content with crucial info) for any anomalies found. 

After the last step, Power Automate takes over from here. We've set up a trigger to listen to addition or modifications of files in the blob storage, and steps to send emails of the alert message to risk analysts.

##### **STEP 1:** Train the model, test the data for anomalies.

###### Import Notes:
- The data features that are selected to be used in the machine learning model include; 
    - **Account Numbers** - Unique identifiers.
    - **Transaction Posting Date** - The day of the transaction.
    - **Transaction Time** - Exact times they happened.
    - **Transaction Amount**
    - **Rolling Average** - The mean of the total amount transacted by each account number/user.
    - **Previous Transaction Time** - The last time a certain user transacted.
    - **Is Transaction the first** - Whether this is the first transaction for a particular user.
    - **Time since last transaction** - Time difference between the last and second last transactions for each user/account.

In [66]:
# Read the data and initiate the DataFrame.
"""
- For this, there are two ways; 
    - the first being reading from the table "PesaTransactions".
    - the second being reading it from the csv file "PesaTransactionsToday.csv"
"""
import pandas as pd

# df = spark.read.table("PesaTransactionsV1.PesaTransactions")
# df = df.toPandas()

df = pd.read_csv('/lakehouse/default/Files/PesaTransactionsToday.csv')
df.head()


StatementMeta(, ac3cf0cc-e561-4c55-aa92-182d3645a533, 68, Finished, Available, Finished)

Unnamed: 0,AccountNo,Time,Amount,PostingDate
0,XX-01400-XX,13:29:01,1000,10/11/2018
1,XX-93316-XX,5:40:57,20000,10/12/2018
2,XX-06984-XX,4:23:17,5000,10/13/2018
3,XX-57152-XX,7:45:45,10000,10/13/2018
4,XX-91706-XX,14:17:58,7000,10/13/2018


In [67]:
# Select and prepare the relevant features:
import pandas as pd
import numpy as np

df["AccountNumber"] = df["AccountNo"].str.extract(r"-(\d+)-")
df['datetime'] = pd.to_datetime(df['PostingDate'] + ' ' + df['Time'])
df['hour'] = df['datetime'].dt.hour
df['day_of_week'] = df['datetime'].dt.dayofweek  # 0=Monday
df['day'] = df['datetime'].dt.day
df['month'] = df['datetime'].dt.month
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)

# Total number of transactions per member
member_txn_count = df.groupby('AccountNumber')['Amount'].count().rename('txn_count')

# Merge back
df = df.merge(member_txn_count, on='AccountNumber', how='left')
df = df.sort_values(by=['AccountNumber', 'datetime'])

# Rolling average amount per member (last 3 transactions)
df['rolling_avg_amt'] = df.groupby('AccountNumber')['Amount'].rolling(window=3, min_periods=1).mean().reset_index(0, drop=True)
df['prev_txn_time'] = df.groupby('AccountNumber')['datetime'].shift(1)
df['is_first_txn'] = df['prev_txn_time'].isna().astype(int)
df['time_since_last'] = (df['datetime'] - df['prev_txn_time']).dt.total_seconds()
df['time_since_last'].fillna(-1, inplace=True)  # Use -1 or a special value
df['log_amount'] = np.log1p(df['Amount'])  # handles zero safely

df.head()

StatementMeta(, ac3cf0cc-e561-4c55-aa92-182d3645a533, 69, Finished, Available, Finished)

Unnamed: 0,AccountNo,Time,Amount,PostingDate,AccountNumber,datetime,hour,day_of_week,day,month,is_weekend,txn_count,rolling_avg_amt,prev_txn_time,is_first_txn,time_since_last,log_amount
38926,XX-00238-XX,14:32:44,5000,11/24/2021,238,2021-11-24 14:32:44,14,2,24,11,0,3,5000.0,NaT,1,-1.0,8.517393
39756,XX-00238-XX,10:25:56,1000,12/3/2021,238,2021-12-03 10:25:56,10,4,3,12,0,3,3000.0,2021-11-24 14:32:44,0,762792.0,6.908755
39758,XX-00238-XX,10:27:38,26000,12/3/2021,238,2021-12-03 10:27:38,10,4,3,12,0,3,10666.666667,2021-12-03 10:25:56,0,102.0,10.16589
35766,XX-00402-XX,17:36:44,20000,10/19/2021,402,2021-10-19 17:36:44,17,1,19,10,0,3,20000.0,NaT,1,-1.0,9.903538
35767,XX-00402-XX,17:38:32,3000,10/19/2021,402,2021-10-19 17:38:32,17,1,19,10,0,3,11500.0,2021-10-19 17:36:44,0,108.0,8.006701


In [68]:
# Determine the features

from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler, LabelEncoder

encoder = LabelEncoder()
df['AccountNumberEncoded'] = encoder.fit_transform(df['AccountNumber'])

feature_cols = [
    'log_amount',
    'AccountNumberEncoded',
    'hour',
    'day_of_week',
    'is_weekend',
    'txn_count',
    'rolling_avg_amt',
    'time_since_last',
    'is_first_txn'
]

# Subset features
features = df[feature_cols]

# Verify the new column is added
print(df[['AccountNumber', 'AccountNumberEncoded']].head())

StatementMeta(, ac3cf0cc-e561-4c55-aa92-182d3645a533, 70, Finished, Available, Finished)

      AccountNumber  AccountNumberEncoded
38926         00238                     0
39756         00238                     0
39758         00238                     0
35766         00402                     1
35767         00402                     1


In [None]:
# Split data for training and evaluation
from sklearn.model_selection import train_test_split

train_data, test_data = train_test_split(features, test_size=0.2, random_state=42)

# Standardize numeric features
scaler = StandardScaler()
#X_scaled = scaler.fit_transform(features)
X_train_scaled = scaler.fit_transform(train_data)
X_test_scaled = scaler.transform(test_data)

StatementMeta(, ac3cf0cc-e561-4c55-aa92-182d3645a533, 71, Finished, Available, Finished)

In [70]:
# Train the Isolation Forest model on the data
from sklearn.ensemble import IsolationForest
from sklearn.impute import SimpleImputer

model = IsolationForest(n_estimators=100, contamination=0.01, random_state=42)
model.fit(X_train_scaled)

StatementMeta(, ac3cf0cc-e561-4c55-aa92-182d3645a533, 72, Finished, Available, Finished)

In [71]:
# Predict anomalies on the scaled test data
y_pred = model.predict(X_test_scaled)

# Convert the predictions (1 = normal, -1 = anomaly) to a more interpretable format
y_pred = [1 if pred == 1 else 0 for pred in y_pred]  # 1 = normal, 0 = anomaly

# Add predictions to the test set for comparison
test_data.loc[:, 'predicted_anomaly'] = y_pred

import numpy as np
test_data.loc[:, 'Status'] = np.where(test_data['predicted_anomaly'] == 0, 'Anomaly', 'Normal')

StatementMeta(, ac3cf0cc-e561-4c55-aa92-182d3645a533, 73, Finished, Available, Finished)

In [72]:
from sklearn.preprocessing import LabelEncoder
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize the LabelEncoder
le = LabelEncoder()

# Fit the encoder on the original AccountNumber values (ensure this is the same data used for encoding)
le.fit(df['AccountNumber'])

# Inverse transform the encoded values to get the original AccountNumber
df['AccountNumber_original'] = le.inverse_transform(df['AccountNumberEncoded'])
# Merge test_data with df on the index to align the rows
result_df = test_data.copy()
result_df['Amount'] = df.loc[result_df.index, 'Amount']
result_df['AccountNumber'] = df.loc[result_df.index, 'AccountNumber']
result_df['AccountNumber_original'] = df.loc[result_df.index, 'AccountNumber_original']


# Initialize Spark session
spark = SparkSession.builder \
    .appName("AnomalyDetection") \
    .getOrCreate()

# Convert to PySpark DataFrame
spark_df = spark.createDataFrame(result_df)

# Cast 'Amount' to integer type
spark_df = spark_df.withColumn('Amount', col('Amount').cast('int'))
spark_df.write \
    .mode('overwrite') \
    .option('overwriteSchema', 'true') \
    .saveAsTable('PesaTransactionsV1.Anomaly_Results')
# Verify the data in the table
spark.sql("SELECT * FROM PesaTransactionsV1.Anomaly_Results").show()

StatementMeta(, ac3cf0cc-e561-4c55-aa92-182d3645a533, 74, Finished, Available, Finished)

+------------------+--------------------+----+-----------+----------+---------+------------------+---------------+------------+-----------------+------+------+-------------+----------------------+
|        log_amount|AccountNumberEncoded|hour|day_of_week|is_weekend|txn_count|   rolling_avg_amt|time_since_last|is_first_txn|predicted_anomaly|Status|Amount|AccountNumber|AccountNumber_original|
+------------------+--------------------+----+-----------+----------+---------+------------------+---------------+------------+-----------------+------+------+-------------+----------------------+
|10.308985993422082|                 494|  15|          2|         0|       29|14533.333333333334|      1821633.0|           0|                1|Normal| 30000|        44324|                 44324|
| 6.803505257608338|                 848|  10|          1|         0|      136|            1400.0|      2129169.0|           0|                1|Normal|   900|        74804|                 74804|
| 7.43897159239

In [73]:
# If there status column is already in the table, first drop it. (Overwriting wouldnt work)

from pyspark.sql.functions import col, when

# Drop the 'status' column in the Spark DataFrame if it exists
if "Status" in spark_df.columns:
    spark_df = spark_df.drop("Status")

# Add the new 'status' column based on 'predicted_anomaly'
spark_df = spark_df.withColumn("Status", 
                               when(col("predicted_anomaly") == 0, "Anomaly")
                               .otherwise("Normal"))


# Overwrite the data in the target lakehouse table
spark_df.write \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("PesaTransactionsV1.Anomaly_Results")


# Verify the data in the table
spark.sql("SELECT * FROM PesaTransactionsV1.Anomaly_Results").show()

StatementMeta(, ac3cf0cc-e561-4c55-aa92-182d3645a533, 75, Finished, Available, Finished)

+------------------+--------------------+----+-----------+----------+---------+------------------+---------------+------------+-----------------+------+-------------+----------------------+------+
|        log_amount|AccountNumberEncoded|hour|day_of_week|is_weekend|txn_count|   rolling_avg_amt|time_since_last|is_first_txn|predicted_anomaly|Amount|AccountNumber|AccountNumber_original|Status|
+------------------+--------------------+----+-----------+----------+---------+------------------+---------------+------------+-----------------+------+-------------+----------------------+------+
|10.308985993422082|                 494|  15|          2|         0|       29|14533.333333333334|      1821633.0|           0|                1| 30000|        44324|                 44324|Normal|
| 6.803505257608338|                 848|  10|          1|         0|      136|            1400.0|      2129169.0|           0|                1|   900|        74804|                 74804|Normal|
| 7.43897159239

In [74]:
# Filter out anomaly rows
import pandas as pd

df_anomalies = spark.read.table("Anomaly_Results").filter("Status = 'Anomaly'").toPandas()

StatementMeta(, ac3cf0cc-e561-4c55-aa92-182d3645a533, 76, Finished, Available, Finished)

In [75]:
# Verify the DataFrame columns:
print(df_anomalies.columns.tolist())

StatementMeta(, ac3cf0cc-e561-4c55-aa92-182d3645a533, 77, Finished, Available, Finished)

['log_amount', 'AccountNumberEncoded', 'hour', 'day_of_week', 'is_weekend', 'txn_count', 'rolling_avg_amt', 'time_since_last', 'is_first_txn', 'predicted_anomaly', 'Amount', 'AccountNumber', 'AccountNumber_original', 'Status']


##### **STEP 2:** Use OpenAI GPT4o-mini to analyze the anomaly level (low/medium/high), along with explanations to the deductions and the message that will be sent to the risk analysts.

- At this point, we've already created a Key Vault on the Azure portal to store the OpenAI endpoint-url and the OpenAI key.

In [76]:
# Function to generate the prompt to send to GPT4 for each anomaly
def format_prompt(transaction):
    return f"""
        A transaction has been flagged by an anomaly detection system. 
        Details:
        - Anomaly: {transaction["Status"]}
        - Amount: KES{transaction["Amount"]}
        - Account No: {transaction["AccountNumber"]}

        Evaluate this transaction and assign a risk level (low, medium, high) with
        a short explanation. Provide a final alert message in plain English for a 
        fraud analyst.

        Add three columns for your output:
        1. RiskLevel.
        2. Explanation.
        3. AlertMessage
    """

StatementMeta(, ac3cf0cc-e561-4c55-aa92-182d3645a533, 78, Finished, Available, Finished)

In [None]:
# Run this in a notebook cell before your imports
!pip install openai==0.28.1 typing-extensions==4.5.0 pydantic==1.10.13 notebookutils

In [78]:
# Call GPT4o mini
import openai
from notebookutils.mssparkutils.credentials import getSecret
import re
import pandas as pd
import json


# Constants
KEY_VAULT_ENDPOINT = "https://pesatransactionskeyvault.vault.azure.net/"
AZURE_OPENAI_API_KEY = getSecret(KEY_VAULT_ENDPOINT, "openaiGPT4o-mini-key")
AZURE_OPENAI_ENDPOINT_URL = getSecret(KEY_VAULT_ENDPOINT, "openaiendpointURL")

openai.api_type = "azure"
openai.api_base = AZURE_OPENAI_ENDPOINT_URL
openai.api_version = "2025-03-01-preview"
openai.api_key = AZURE_OPENAI_API_KEY

def gpt4o_risk_evaluation(prompt):
    response = openai.ChatCompletion.create(
        engine="gpt-4o-mini-kenya-hack",
        messages=[
            {"role": "system", "content": """You are a fraud assistant. Always respond in valid JSON format with these exact keys:
            {
                "risk_level": "low/medium/high",
                "explanation": "your explanation here",
                "alert_message": "your alert message here"
            }"""},
                        {"role": "user", "content": prompt}
                    ],
        response_format={"type": "json_object"},
        temperature=0.7
    )
    return json.loads(response.choices[0].message["content"])

def parse_gpt4_response(response_text):
    """Parse the GPT-4 response into structured components"""
    risk_level = response_text["risk_level"]
    explanation = response_text["explanation"]
    alert_message = response_text["alert_message"]
    
    return pd.Series({
        'RiskLevel': risk_level,
        'Explanation': explanation,
        'AlertMessage': alert_message
    })

StatementMeta(, ac3cf0cc-e561-4c55-aa92-182d3645a533, 80, Finished, Available, Finished)

In [79]:
# Loop through the anomalies and evaluate each
"""
- The intention for this is to come up with a table with the original data, but with additional columns for the risk level,
the explanation as to why this is an anomaly, and the alert message to be sent to analysts.
"""

response_series = df_anomalies.apply(
    lambda row: parse_gpt4_response(gpt4o_risk_evaluation(format_prompt(row))), 
    axis=1
)

# Add the parsed columns to your DataFrame
df_anomalies[['RiskLevel', 'Explanation', 'AlertMessage']] = response_series

# Show what we have so far:
df_anomalies.head()

StatementMeta(, ac3cf0cc-e561-4c55-aa92-182d3645a533, 81, Finished, Available, Finished)

Unnamed: 0,log_amount,AccountNumberEncoded,hour,day_of_week,is_weekend,txn_count,rolling_avg_amt,time_since_last,is_first_txn,predicted_anomaly,Amount,AccountNumber,AccountNumber_original,Status,RiskLevel,Explanation,AlertMessage
0,9.615872,260,16,5,1,69,15200.0,11316140.0,0,0,15000,24498,24498,Anomaly,medium,The transaction amount of KES15000 is signific...,A transaction of KES15000 has been flagged as ...
1,8.2943,1044,18,5,1,68,2333.333333,14991315.0,0,0,4000,91126,91126,Anomaly,medium,The transaction amount of KES4000 is moderate ...,A transaction of KES4000 from account number 9...
2,10.308986,1138,15,1,0,32,30000.0,-1.0,1,0,30000,98588,98588,Anomaly,medium,The transaction amount of KES30000 is signific...,Please review the transaction of KES30000 on a...


In [80]:
# Save results back to the lakehouse but in a new table
from pyspark.sql import SparkSession

spark_df = spark.createDataFrame(df_anomalies)

spark_df.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("append") \
    .saveAsTable("Anomaly_Risk_Summary")

StatementMeta(, ac3cf0cc-e561-4c55-aa92-182d3645a533, 82, Finished, Available, Finished)

##### **STEP 3:** Connect to Azure blob storage, upload `.txt` files (the text in here is the alert message content with crucial info) for any anomalies found.

- At this point, a Storage Account is already created on Azure, along with the blob container that we'll use to store the uploaded files.
- The final cell in this section will log; A message to show if connecting to the blob container is successful, if uploading the anomaly files is successful, and lastly, a verification to show how many items are currently in the blob container (well, only a few file names).

In [None]:
# Install Dependencies
!pip install azure-storage-blob

In [None]:
## Save Anomalies Summary to Blob Storage
# This cell handles exporting our processed data to Azure Blob Storage as JSON

# %%
import pandas as pd
from azure.storage.blob import BlobServiceClient
from datetime import datetime
import json
import os

# Constants
KEY_VAULT_ENDPOINT = "https://pesatransactionskeyvault.vault.azure.net/"
AZURE_OPENAI_API_KEY = getSecret(KEY_VAULT_ENDPOINT, "openaiGPT4o-mini-key")
AZURE_OPENAI_ENDPOINT_URL = getSecret(KEY_VAULT_ENDPOINT, "openaiendpointURL")

# Configuration Section
STORAGE_ACCOUNT_NAME = getSecret(KEY_VAULT_ENDPOINT, "storageAccountName")
#STORAGE_ACCOUNT_KEY = getSecret(KEY_VAULT_ENDPOINT, "storageAccountKey")
STORAGE_ACCOUNT_KEY = "7DH6ouC5LVbOa0hjo3zjI0JHq9hxk+dJPAFJYlUNpRpESnl5T2P5+2BRilkyZVN1lzNJnWJvnmPLV+ASt8JuRaQ=="
CONTAINER_NAME = "gpt4osummary"

# Generate timestamp for filename
TIMESTAMP = datetime.now().strftime('%Y%m%d_%H%M%S')
BLOB_NAME = f"anomaly_alerts_{TIMESTAMP}.txt"

# Create the connection to Azure Blob Storage
try:
    connection_string = f"DefaultEndpointsProtocol=https;AccountName={STORAGE_ACCOUNT_NAME};\
        AccountKey={STORAGE_ACCOUNT_KEY};EndpointSuffix=core.windows.net"
    blob_service_client = BlobServiceClient.from_connection_string(connection_string)
    
    # Get reference to container (will create if doesn't exist)
    container_client = blob_service_client.get_container_client(CONTAINER_NAME)
    if not container_client.exists():
        container_client.create_container()
    
    print(f"Successfully connected to Azure Blob Storage container: {CONTAINER_NAME}")
except Exception as e:
    print(f"Error connecting to Azure Blob Storage: {str(e)}")
    raise

# Convert Spark DataFrame to text files and upload
try:
    # First convert Spark DataFrame to Pandas
    pandas_df = spark_df.toPandas()
    
    # Ensure the required column exists
    if 'AlertMessage' not in pandas_df.columns:
        raise ValueError("DataFrame is missing required 'AlertMessage' column")
    
    # Upload each summary as a separate text file
    for index, row in pandas_df.iterrows():
        # Create filename with timestamp and index
        txt_filename = f"anomaly_summary_{TIMESTAMP}_{index}.txt"
        
        # Get the summary text
        summary_text = str(row['AlertMessage'])
        
        # Upload to blob storage
        blob_client = container_client.get_blob_client(txt_filename)
        blob_client.upload_blob(summary_text, overwrite=True)
        
        print(f"Uploaded: {txt_filename} (Size: {len(summary_text)} characters)")
    
    print(f"\nSuccessfully uploaded {len(pandas_df)} text files to container {CONTAINER_NAME}")

except Exception as e:
    print(f"Error uploading text files to blob storage: {str(e)}")
    raise

# Verification - list blobs in container (optional)
print("\nCurrent blobs in container:")
try:
    blob_list = container_client.list_blobs()
    txt_files = [blob for blob in blob_list if blob.name.endswith('.txt')]
    
    print(f"Found {len(txt_files)} text files:")
    for blob in txt_files[:5]:
        print(f"- {blob.name} (Size: {blob.size} bytes)")
    if len(txt_files) > 5:
        print(f"- ... and {len(txt_files)-5} more")
    
except Exception as e:
    print(f"Error listing blobs: {str(e)}")