# Real-time fraud detection with vector search for MemoryDB

## 1. Components
![Packages](images/AWS-OnAir_01-Architecture.png)

## 2. Install packages
![Packages](images/AWS-OnAir_02-Packages.jpeg)

In [1]:
# Install/upgrade pip and other packages in the current Jupyter kernel
import sys
!{sys.executable} -m pip install --upgrade pip
!{sys.executable} -m pip install python-dotenv
!{sys.executable} -m pip install pandas
!{sys.executable} -m pip install numpy
!{sys.executable} -m pip install -U valkey # Note the Valkey library



In [2]:
import pandas as pd
import numpy as np
import uuid
import datetime
import os
import time
from dotenv import load_dotenv

from valkey.commands.search.field import VectorField, TextField
from valkey.commands.search.field import NumericField, TagField
from valkey.commands.search.query import Query
from valkey.commands.search.result import Result
from valkey.commands.search.indexDefinition import IndexDefinition, IndexType
from valkey.cluster import ValkeyCluster as MemoryDB

## 3. Connect to MemoryDB
![Packages](images/AWS-OnAir_03-Connection.jpeg)

In [3]:
load_dotenv('env.txt')

memorydb_host = os.environ.get("MEMORYDB_HOST", "localhost")
memorydb_port = os.environ.get("MEMORYDB_PORT", 6379)

mdb = MemoryDB(host=memorydb_host, port=memorydb_port, ssl=True, decode_responses=False, ssl_cert_reqs="none")

print("Ping status of MemoryDB = " + str(mdb.ping()))

Ping status of MemoryDB = True


## 4. [Credit Card Fraud Detection Source](https://www.kaggle.com/datasets/mlg-ulb/creditcardfraud)

Originally from Kaggle.

This dataset presents transactions that occurred in two days, where we have __492__ _frauds_ out of __284,807__ _transactions_.  
The dataset is highly unbalanced, the 1 Class (_frauds_) account for __0.172%__ of all _transactions_.

It contains only numerical input variables:
    
- Feature __'Time'__ contains the seconds elapsed between each transaction and the first transaction in the dataset.
- Features __V1__, __V2__, and __V28__ Are 28 dimensions of vectorized (_embeddings created_) data representing transaction details such a time location and so on.
- Feature __'Amount'__ is the transaction Amount, this feature can be used for example-dependant cost-sensitive learning.
- Feature __'Class'__ is the response variable and it takes value 1 in case of fraud and 0 otherwise.

![Packages](images/AWS-OnAir_04-NeedleHaystack.jpeg)

In [None]:
df = pd.read_csv("data/creditcard.csv")

print(f"Number of rows in dataset: {df.shape[0]:,} Number of columns: {df.shape[1]:,}\n")

df.head()

In [None]:
# Validate data
# Ensure the specified columns exist in the DataFrame

embedding_columns = [f'V{i}' for i in range(1, 29)]

missing_columns = [col for col in embedding_columns if col not in df.columns]
if missing_columns:
    raise ValueError(f"The following embedding columns are missing from the DataFrame: {missing_columns}")

df['Vector'] = df[embedding_columns].values.tolist()

print(f"Number of columns in dataset: {df.shape[1]:,}\n")
print(f"{df['Vector'].head().to_string()}\n")

## 5. Create index in MemoryDB

![Packages](images/AWS-OnAir_05-Index.jpeg)

In [None]:
def generate_key(prefix = ""):
    return prefix + str(uuid.uuid4())

def create_hnsw_index(mdb, index_name, vector_field_name, initial_size, 
                      vector_dimensions=len(embedding_columns), distance_metric='L2', M_EDGES=16, EF_CONSTRUCT=512, key_prefix=''):
    # Create the MemoryDB index
    # larger M value increases the number of edges thus creating a more connected graph helping with recall but consumes more memory
    # Larger EF_CONSTRUCTION has a larger dynamic candidate list during construction. This leads to a more thorough search during construction and longer construction time
    # Larger EF_RUNTIME examines more vectors during query execution resulting in better recall but taking longer to complete left at default value of 10
    # Distance Metrics L2->Euclidean distance. | IP->Dot product | COSINE->the angle between vectors

    # Drop the index if it exits so that you can re-run this block of code.
    # print( mdb.ft(index_name).info())

    # Create a new index
    try:
        mdb.ft(index_name).create_index([
            VectorField(vector_field_name, 
                        "HNSW", {
                            "TYPE": "FLOAT32",
                            "DIM": vector_dimensions,
                            "DISTANCE_METRIC": distance_metric,
                            "INITIAL_CAP": initial_size,
                            "M": M_EDGES,
                            "EF_CONSTRUCTION": EF_CONSTRUCT
                        }
                ),
            NumericField("amount"),
            NumericField("class")
            ],
            definition=IndexDefinition(prefix=[key_prefix])
        )
        print(f"Index {index_name} created successfully.")

    except Exception as e:
        print(f"Index {index_name} created previously: {str(e)}")

## Behind the scenes
```
FT.CREATE "ccfd_hnsw_index"
ON HASH
PREFIX "1" "tsx:"
SCHEMA "vector" 
VECTOR "HNSW" "12" "TYPE" "FLOAT32" "DIM" "28" "DISTANCE_METRIC" "Cosine" 
INITIAL_CAP "274807" "M" "16" 
EF_CONSTRUCTION "512" "amount" "NUMERIC" "class" "NUMERIC"
```

![Packages](images/AWS-OnAir_08-KNNdistanceMetrics.png)

In [None]:
KEY_PREFIX = "tsx:"
vector_field_name = "VEC"
index_name = "ccfd_hnsw_index"
number_of_vectors = len(df[df["Class"] == 1]) - 10   # do not add the last 10 fraudulent rows
vector_dimensions = len(embedding_columns)       # vector dimension is the number of embedded/vector columns only

print(f"Creating Vector Index {index_name} on Field {vector_field_name} Expecting {number_of_vectors:,} vectors")

# First clean up  MemoryDB
mdb.flushall()
# mdb.ft(index_name).dropindex()

# Create an emtpy index in MemoryDB
create_hnsw_index(mdb, index_name, vector_field_name, number_of_vectors, 
                 vector_dimensions=vector_dimensions, distance_metric='Cosine', M_EDGES=16, EF_CONSTRUCT=512, key_prefix=KEY_PREFIX)

print(f"\nVector Information: {mdb.ft(index_name).info()}")
print(f"\nNumber of indexed vectors: {mdb.ft(index_name).info()['num_indexed_vectors']}")

6. Load vector embeddings into MemoryDB

![Index](images/AWS-OnAir_06-Load.jpeg)

In [None]:
%%time
# Load data into MemoryDB
BATCH_SIZE = 100

# Import tqdm for jupyter notebook
from tqdm.notebook import tqdm
# Enable tqdm for Pandas
tqdm.pandas()

count = 0
pipe = mdb.pipeline()

for index, row in tqdm(df.loc[df['Class'] == 1].iloc[:number_of_vectors-1].iterrows(), total=number_of_vectors-1):
#for index, row in tqdm(df.iloc[:250000].iterrows(), total=250000):
    
    key = generate_key(prefix=KEY_PREFIX)
    vector = np.array(row['Vector'], dtype=np.float32).tobytes()
    
    pipe.hset(key, mapping={
        vector_field_name: vector,
        'amount': row['Amount'],
        'class': row['Class']
        })
    
    if index % BATCH_SIZE == 0:
        pipe.execute()
        pipe = mdb.pipeline()
    count += 1
    
pipe.execute()

print(f"\nData indexed successfully. Keys created: {count}\n")
print(f"Indexed info: {mdb.ft(index_name).info()}")
time.sleep(1)
print(f"\nNumber of indexed vectors: {mdb.ft(index_name).info()['num_indexed_vectors']}\n")

## 7. Find fraudulent transactions

![Find-Tsx](images/AWS-OnAir_07-Find.jpeg)

In [None]:
def similarity_search(mdb, index_name, query_vector, top_n=5):

    # Convert the query vector to bytes
    query_vector_bytes = np.array(query_vector, dtype=np.float32).tobytes()

    # Create the query
    query = Query(f"*=>[KNN {top_n} @VEC $query_vec AS score ]") \
        .sort_by("score") \
        .return_fields("score", "amount", "class") \
        .paging(0, top_n) \
        .dialect(2)

    params = {
        "query_vec": query_vector_bytes,
        "EF_RUNTIME": 10
    }

    # Process the query
    result = mdb.ft(index_name).search(query, query_params=params).docs
    return result

In [None]:

result = df.query('Class == 1')['Amount'].tail(10)
print(result.to_string())


In [None]:
%%time

query_vector = df.iloc[281143]['Vector']

results = similarity_search(mdb, index_name, query_vector, top_n=5)

# print(results)

for doc in results:
        score = round(1 - float(doc.score), 2)
        id = doc.id
        print(f"Vector {id} has a score {score}")
        # amount = doc.amount
        # print(f"Vector {id} has a score {score} for the amount {amount}")
print("\n")

### Query behind the scenes

```
FT.SEARCH "ccfd_hnsw_index" "*=>[KNN 5 @vector $query_vec AS score]" 
RETURN "2" "amount" "class" 
SORTBY "score" "ASC" "DIALECT" "2" "LIMIT" "0" "5" 
"params" "2" "query_vec" "\x1e!2\xbf\x0b\xef\x14?\x184\x18@\xbd\xd5\x81=?\x82\xa8>\xa5T\xe6\xbe\...\xbf"
```

In [None]:
import random

print('#' * 50)

for i in range(10):
    
    match i:
        case 3:
            i = 281144
            row = df.iloc[i]
        case 5:
            i = 4920
            row = df.iloc[i]
        case _:
            i = random.randint(0, 250_000)
            row = df.iloc[i]

    vector = row['Vector']
    results = similarity_search(mdb, index_name, vector, top_n=5)
    print(f"Classification {row.Class} Transaction ID {i}")
    print(row.Class)
    for doc in results:
        score = round(1 - float(doc.score), 2)
        id = doc.id
        print(f"Index: Vector {id} has a score {score}")
        
    print('#' * 50)