# Real-time Fraud Detection with Redis

This notebook demonstrates building a comprehensive fraud detection system using Redis Stack capabilities.

## Architecture Overview

```
Transaction → Redis Streams → Processing Pipeline → Fraud Scoring → Alerts
                ↓
            RedisJSON (Storage) → RedisSearch (Queries) → Vector Search (Similarity)
```

In [None]:
# Install required packages
import sys
import subprocess

def install_packages():
    packages = [
        'redis[hiredis]',
        'pandas',
        'numpy',
        'scikit-learn',
        'faker',
        'matplotlib',
        'seaborn',
        'plotly'
    ]
    
    for package in packages:
        try:
            subprocess.check_call([sys.executable, '-m', 'pip', 'install', package])
            print(f"✅ {package} installed successfully")
        except subprocess.CalledProcessError:
            print(f"❌ Failed to install {package}")

# Uncomment to install packages
# install_packages()

In [None]:
# Import required libraries
import redis
import json
import time
import random
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from faker import Faker
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')

# Initialize Faker for generating synthetic data
fake = Faker()
Faker.seed(42)
random.seed(42)
np.random.seed(42)

print("📦 All libraries imported successfully!")

## 1. Redis Connection Setup

In [None]:
# Redis connection configuration
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0

# Connect to Redis
try:
    r = redis.Redis(
        host=REDIS_HOST,
        port=REDIS_PORT,
        db=REDIS_DB,
        decode_responses=True
    )
    
    # Test connection
    r.ping()
    print("✅ Connected to Redis successfully!")
    
    # Check Redis modules
    modules = r.module_list()
    module_names = [module[1] for module in modules]
    
    required_modules = ['search', 'ReJSON', 'timeseries']
    for module in required_modules:
        if module in module_names:
            print(f"✅ {module} module is available")
        else:
            print(f"❌ {module} module is NOT available")
            
except redis.ConnectionError:
    print("❌ Failed to connect to Redis. Please ensure Redis Stack is running.")
    print("   Start Redis Stack with: docker run -d --name redis-stack -p 6379:6379 redis/redis-stack:latest")

## 2. Data Models and Schema Design

In [None]:
# Define data schemas
class FraudDetectionSchemas:
    
    @staticmethod
    def transaction_schema():
        return {
            "transaction_id": "string",
            "user_id": "string",
            "merchant_id": "string",
            "amount": "float",
            "currency": "string",
            "timestamp": "datetime",
            "location": {
                "latitude": "float",
                "longitude": "float",
                "country": "string",
                "city": "string"
            },
            "device": {
                "device_id": "string",
                "ip_address": "string",
                "user_agent": "string"
            },
            "merchant_category": "string",
            "payment_method": "string",
            "is_fraud": "boolean",
            "fraud_score": "float",
            "features": {
                "velocity_1h": "int",
                "velocity_24h": "int",
                "amount_percentile": "float",
                "distance_from_home": "float",
                "time_since_last_transaction": "int"
            }
        }
    
    @staticmethod
    def user_profile_schema():
        return {
            "user_id": "string",
            "created_date": "datetime",
            "home_location": {
                "latitude": "float",
                "longitude": "float",
                "country": "string",
                "city": "string"
            },
            "spending_patterns": {
                "avg_transaction_amount": "float",
                "preferred_merchants": "list",
                "typical_hours": "list",
                "monthly_spend": "float"
            },
            "risk_profile": {
                "risk_score": "float",
                "fraud_history": "int",
                "account_age_days": "int"
            }
        }

print("📋 Data schemas defined successfully!")
print("\n🔍 Transaction Schema:")
print(json.dumps(FraudDetectionSchemas.transaction_schema(), indent=2))

## 3. Redis Index Creation

In [None]:
# Create Redis Search indexes for fraud detection
def create_fraud_indexes():
    try:
        # Drop existing indexes if they exist
        try:
            r.ft('idx:transactions').dropindex()
            r.ft('idx:users').dropindex()
            r.ft('idx:merchants').dropindex()
            print("🗑️ Dropped existing indexes")
        except:
            pass
        
        # Transaction index with vector field for similarity search
        transaction_schema = [
            'transaction_id', 'TEXT', 'SORTABLE',
            'user_id', 'TEXT', 'SORTABLE',
            'merchant_id', 'TEXT', 'SORTABLE',
            'amount', 'NUMERIC', 'SORTABLE',
            'timestamp', 'NUMERIC', 'SORTABLE',
            'location.country', 'TEXT',
            'location.city', 'TEXT',
            'merchant_category', 'TAG', 'SORTABLE',
            'payment_method', 'TAG',
            'is_fraud', 'TAG',
            'fraud_score', 'NUMERIC', 'SORTABLE',
            'features.velocity_1h', 'NUMERIC',
            'features.velocity_24h', 'NUMERIC',
            'transaction_vector', 'VECTOR', 'HNSW', '6', 'TYPE', 'FLOAT32', 'DIM', '10', 'DISTANCE_METRIC', 'COSINE'
        ]
        
        r.ft('idx:transactions').create_index(
            transaction_schema,
            definition=redis.commands.search.IndexDefinition(
                prefix=['transaction:'],
                index_type=redis.commands.search.IndexType.JSON
            )
        )
        print("✅ Transaction index created")
        
        # User profile index
        user_schema = [
            'user_id', 'TEXT', 'SORTABLE',
            'home_location.country', 'TEXT',
            'home_location.city', 'TEXT',
            'risk_profile.risk_score', 'NUMERIC', 'SORTABLE',
            'risk_profile.account_age_days', 'NUMERIC',
            'spending_patterns.avg_transaction_amount', 'NUMERIC'
        ]
        
        r.ft('idx:users').create_index(
            user_schema,
            definition=redis.commands.search.IndexDefinition(
                prefix=['user:'],
                index_type=redis.commands.search.IndexType.JSON
            )
        )
        print("✅ User index created")
        
        # Merchant index
        merchant_schema = [
            'merchant_id', 'TEXT', 'SORTABLE',
            'name', 'TEXT',
            'category', 'TAG',
            'location.country', 'TEXT',
            'risk_level', 'TAG'
        ]
        
        r.ft('idx:merchants').create_index(
            merchant_schema,
            definition=redis.commands.search.IndexDefinition(
                prefix=['merchant:'],
                index_type=redis.commands.search.IndexType.JSON
            )
        )
        print("✅ Merchant index created")
        
    except Exception as e:
        print(f"❌ Error creating indexes: {e}")

create_fraud_indexes()

## 4. Synthetic Data Generation

In [None]:
# Generate synthetic fraud detection data
class FraudDataGenerator:
    def __init__(self):
        self.merchants = self._generate_merchants()
        self.users = self._generate_users()
        
    def _generate_merchants(self, count=100):
        merchants = []
        categories = ['grocery', 'gas', 'restaurant', 'retail', 'online', 'atm', 'pharmacy', 'entertainment']
        
        for i in range(count):
            merchant = {
                'merchant_id': f'merchant_{i:04d}',
                'name': fake.company(),
                'category': random.choice(categories),
                'location': {
                    'latitude': float(fake.latitude()),
                    'longitude': float(fake.longitude()),
                    'country': fake.country(),
                    'city': fake.city()
                },
                'risk_level': random.choice(['low', 'medium', 'high'])
            }
            merchants.append(merchant)
        return merchants
    
    def _generate_users(self, count=1000):
        users = []
        
        for i in range(count):
            home_lat = float(fake.latitude())
            home_lon = float(fake.longitude())
            
            user = {
                'user_id': f'user_{i:05d}',
                'created_date': fake.date_between(start_date='-2y', end_date='today').isoformat(),
                'home_location': {
                    'latitude': home_lat,
                    'longitude': home_lon,
                    'country': fake.country(),
                    'city': fake.city()
                },
                'spending_patterns': {
                    'avg_transaction_amount': round(random.uniform(20, 500), 2),
                    'preferred_merchants': random.sample([m['merchant_id'] for m in self.merchants[:20]], 3),
                    'typical_hours': sorted(random.sample(range(6, 23), random.randint(3, 8))),
                    'monthly_spend': round(random.uniform(500, 5000), 2)
                },
                'risk_profile': {
                    'risk_score': round(random.uniform(0.1, 0.9), 3),
                    'fraud_history': random.randint(0, 3),
                    'account_age_days': random.randint(30, 730)
                }
            }
            users.append(user)
        return users

# Initialize data generator
data_gen = FraudDataGenerator()
print(f"📊 Generated {len(data_gen.merchants)} merchants and {len(data_gen.users)} users")

In [None]:
# Store merchants and users in Redis
def store_reference_data():
    print("💾 Storing reference data in Redis...")
    
    # Store merchants
    for merchant in data_gen.merchants:
        key = f"merchant:{merchant['merchant_id']}"
        r.json().set(key, '$', merchant)
    
    # Store users
    for user in data_gen.users:
        key = f"user:{user['user_id']}"
        r.json().set(key, '$', user)
    
    print(f"✅ Stored {len(data_gen.merchants)} merchants and {len(data_gen.users)} users")

store_reference_data()

## 5. Transaction Generation and Feature Engineering

In [None]:
import math
from geopy.distance import geodesic

class TransactionGenerator:
    def __init__(self, users, merchants):
        self.users = users
        self.merchants = merchants
        self.transaction_history = {}
        
    def calculate_distance(self, lat1, lon1, lat2, lon2):
        """Calculate distance between two points in kilometers"""
        try:
            return geodesic((lat1, lon1), (lat2, lon2)).kilometers
        except:
            # Fallback to haversine formula
            R = 6371  # Earth's radius in km
            dlat = math.radians(lat2 - lat1)
            dlon = math.radians(lon2 - lon1)
            a = (math.sin(dlat/2) * math.sin(dlat/2) + 
                 math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * 
                 math.sin(dlon/2) * math.sin(dlon/2))
            c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
            return R * c
    
    def calculate_features(self, user_id, merchant, amount, timestamp):
        """Calculate fraud detection features"""
        user_transactions = self.transaction_history.get(user_id, [])
        
        # Time-based features
        current_time = timestamp
        one_hour_ago = current_time - 3600
        one_day_ago = current_time - 86400
        
        velocity_1h = len([t for t in user_transactions if t['timestamp'] >= one_hour_ago])
        velocity_24h = len([t for t in user_transactions if t['timestamp'] >= one_day_ago])
        
        # Amount-based features
        user_amounts = [t['amount'] for t in user_transactions]
        if user_amounts:
            amount_percentile = np.percentile(user_amounts + [amount], 
                                            (len([a for a in user_amounts if a <= amount]) / len(user_amounts)) * 100)
        else:
            amount_percentile = 50.0
        
        # Location-based features
        user = next(u for u in self.users if u['user_id'] == user_id)
        home_lat = user['home_location']['latitude']
        home_lon = user['home_location']['longitude']
        merchant_lat = merchant['location']['latitude']
        merchant_lon = merchant['location']['longitude']
        
        distance_from_home = self.calculate_distance(home_lat, home_lon, merchant_lat, merchant_lon)
        
        # Time since last transaction
        if user_transactions:
            last_transaction_time = max(t['timestamp'] for t in user_transactions)
            time_since_last = current_time - last_transaction_time
        else:
            time_since_last = 86400  # 24 hours if no previous transactions
        
        return {
            'velocity_1h': velocity_1h,
            'velocity_24h': velocity_24h,
            'amount_percentile': round(amount_percentile, 2),
            'distance_from_home': round(distance_from_home, 2),
            'time_since_last_transaction': time_since_last
        }
    
    def generate_transaction_vector(self, features, amount, merchant_category):
        """Generate a feature vector for similarity search"""
        # Normalize features to create a 10-dimensional vector
        category_encoding = hash(merchant_category) % 1000 / 1000.0
        
        vector = [
            min(features['velocity_1h'] / 10.0, 1.0),  # Normalize velocity
            min(features['velocity_24h'] / 50.0, 1.0),
            min(amount / 1000.0, 1.0),  # Normalize amount
            features['amount_percentile'] / 100.0,
            min(features['distance_from_home'] / 1000.0, 1.0),  # Normalize distance
            min(features['time_since_last_transaction'] / 86400.0, 1.0),  # Normalize time
            category_encoding,
            random.random(),  # Random component
            random.random(),
            random.random()
        ]
        
        return vector

print("🏭 Transaction generator class defined")

## 6. Fraud Detection Rules Engine

In [None]:
class FraudRulesEngine:
    def __init__(self):
        self.rules = {
            'velocity_rule': self.velocity_rule,
            'amount_rule': self.amount_rule,
            'location_rule': self.location_rule,
            'time_rule': self.time_rule,
            'merchant_risk_rule': self.merchant_risk_rule
        }
    
    def velocity_rule(self, features, user_profile):
        """Check for suspicious transaction velocity"""
        score = 0.0
        reasons = []
        
        if features['velocity_1h'] > 5:
            score += 0.3
            reasons.append(f"High velocity: {features['velocity_1h']} transactions in 1 hour")
        
        if features['velocity_24h'] > 20:
            score += 0.2
            reasons.append(f"High daily velocity: {features['velocity_24h']} transactions in 24 hours")
        
        return score, reasons
    
    def amount_rule(self, amount, features, user_profile):
        """Check for suspicious transaction amounts"""
        score = 0.0
        reasons = []
        
        avg_amount = user_profile['spending_patterns']['avg_transaction_amount']
        
        if amount > avg_amount * 5:
            score += 0.4
            reasons.append(f"Amount {amount} is 5x higher than average {avg_amount}")
        elif amount > avg_amount * 3:
            score += 0.2
            reasons.append(f"Amount {amount} is 3x higher than average {avg_amount}")
        
        if features['amount_percentile'] > 95:
            score += 0.2
            reasons.append(f"Amount in 95th percentile for user")
        
        return score, reasons
    
    def location_rule(self, features, merchant, user_profile):
        """Check for suspicious locations"""
        score = 0.0
        reasons = []
        
        distance = features['distance_from_home']
        
        if distance > 1000:  # More than 1000km from home
            score += 0.3
            reasons.append(f"Transaction {distance}km from home location")
        elif distance > 500:
            score += 0.1
            reasons.append(f"Transaction {distance}km from home location")
        
        # Different country check
        if merchant['location']['country'] != user_profile['home_location']['country']:
            score += 0.2
            reasons.append("International transaction")
        
        return score, reasons
    
    def time_rule(self, timestamp, features, user_profile):
        """Check for suspicious timing"""
        score = 0.0
        reasons = []
        
        # Check if transaction is outside typical hours
        hour = datetime.fromtimestamp(timestamp).hour
        typical_hours = user_profile['spending_patterns']['typical_hours']
        
        if hour not in typical_hours:
            score += 0.1
            reasons.append(f"Transaction at unusual hour: {hour}")
        
        # Very quick successive transactions
        if features['time_since_last_transaction'] < 60:  # Less than 1 minute
            score += 0.3
            reasons.append("Very quick successive transaction")
        
        return score, reasons
    
    def merchant_risk_rule(self, merchant):
        """Check merchant risk level"""
        score = 0.0
        reasons = []
        
        if merchant['risk_level'] == 'high':
            score += 0.3
            reasons.append("High-risk merchant")
        elif merchant['risk_level'] == 'medium':
            score += 0.1
            reasons.append("Medium-risk merchant")
        
        return score, reasons
    
    def evaluate_transaction(self, transaction, features, merchant, user_profile):
        """Evaluate all rules and return fraud score"""
        total_score = 0.0
        all_reasons = []
        
        # Apply all rules
        score, reasons = self.velocity_rule(features, user_profile)
        total_score += score
        all_reasons.extend(reasons)
        
        score, reasons = self.amount_rule(transaction['amount'], features, user_profile)
        total_score += score
        all_reasons.extend(reasons)
        
        score, reasons = self.location_rule(features, merchant, user_profile)
        total_score += score
        all_reasons.extend(reasons)
        
        score, reasons = self.time_rule(transaction['timestamp'], features, user_profile)
        total_score += score
        all_reasons.extend(reasons)
        
        score, reasons = self.merchant_risk_rule(merchant)
        total_score += score
        all_reasons.extend(reasons)
        
        # Add user risk profile
        total_score += user_profile['risk_profile']['risk_score'] * 0.2
        
        # Cap the score at 1.0
        total_score = min(total_score, 1.0)
        
        return total_score, all_reasons

# Initialize fraud rules engine
fraud_engine = FraudRulesEngine()
print("🛡️ Fraud rules engine initialized")

## 7. Real-time Transaction Processing

In [None]:
class RealTimeFraudDetector:
    def __init__(self, redis_client, fraud_engine):
        self.r = redis_client
        self.fraud_engine = fraud_engine
        self.transaction_gen = TransactionGenerator(data_gen.users, data_gen.merchants)
        
    def process_transaction(self, user_id, merchant_id, amount, payment_method='card'):
        """Process a single transaction through the fraud detection pipeline"""
        start_time = time.time()
        
        # Generate transaction ID and timestamp
        transaction_id = f"txn_{int(time.time() * 1000)}_{random.randint(1000, 9999)}"
        timestamp = int(time.time())
        
        # Get user and merchant data
        user_data = self.r.json().get(f"user:{user_id}")
        merchant_data = self.r.json().get(f"merchant:{merchant_id}")
        
        if not user_data or not merchant_data:
            return {"error": "User or merchant not found"}
        
        # Calculate features
        features = self.transaction_gen.calculate_features(user_id, merchant_data, amount, timestamp)
        
        # Generate transaction vector for similarity search
        transaction_vector = self.transaction_gen.generate_transaction_vector(
            features, amount, merchant_data['category']
        )
        
        # Create transaction object
        transaction = {
            'transaction_id': transaction_id,
            'user_id': user_id,
            'merchant_id': merchant_id,
            'amount': amount,
            'currency': 'USD',
            'timestamp': timestamp,
            'location': merchant_data['location'],
            'device': {
                'device_id': f"device_{random.randint(1000, 9999)}",
                'ip_address': fake.ipv4(),
                'user_agent': fake.user_agent()
            },
            'merchant_category': merchant_data['category'],
            'payment_method': payment_method,
            'features': features,
            'transaction_vector': transaction_vector
        }
        
        # Run fraud detection
        fraud_score, reasons = self.fraud_engine.evaluate_transaction(
            transaction, features, merchant_data, user_data
        )
        
        # Determine if transaction is fraudulent
        is_fraud = fraud_score > 0.7  # Threshold for fraud
        
        # Add fraud detection results
        transaction['fraud_score'] = round(fraud_score, 3)
        transaction['is_fraud'] = is_fraud
        transaction['fraud_reasons'] = reasons
        
        # Store transaction in Redis
        transaction_key = f"transaction:{transaction_id}"
        self.r.json().set(transaction_key, '$', transaction)
        
        # Add to Redis Stream for real-time processing
        stream_data = {
            'transaction_id': transaction_id,
            'user_id': user_id,
            'amount': str(amount),
            'fraud_score': str(fraud_score),
            'is_fraud': str(is_fraud)
        }
        self.r.xadd('fraud_stream', stream_data)
        
        # Update transaction history for user
        if user_id not in self.transaction_gen.transaction_history:
            self.transaction_gen.transaction_history[user_id] = []
        
        self.transaction_gen.transaction_history[user_id].append({
            'timestamp': timestamp,
            'amount': amount,
            'merchant_id': merchant_id
        })
        
        processing_time = (time.time() - start_time) * 1000  # Convert to milliseconds
        
        return {
            'transaction_id': transaction_id,
            'fraud_score': fraud_score,
            'is_fraud': is_fraud,
            'reasons': reasons,
            'processing_time_ms': round(processing_time, 2)
        }
    
    def find_similar_transactions(self, transaction_vector, limit=5):
        """Find similar transactions using vector search"""
        try:
            # Convert vector to bytes for Redis
            vector_bytes = np.array(transaction_vector, dtype=np.float32).tobytes()
            
            # Perform vector search
            query = f"*=>[KNN {limit} @transaction_vector $vector AS score]"
            
            result = self.r.ft('idx:transactions').search(
                redis.commands.search.Query(query)
                .sort_by('score')
                .return_fields('transaction_id', 'fraud_score', 'is_fraud', 'score')
                .dialect(2),
                query_params={'vector': vector_bytes}
            )
            
            return [{
                'transaction_id': doc.transaction_id,
                'fraud_score': float(doc.fraud_score),
                'is_fraud': doc.is_fraud == 'true',
                'similarity_score': float(doc.score)
            } for doc in result.docs]
            
        except Exception as e:
            print(f"Vector search error: {e}")
            return []

# Initialize real-time fraud detector
fraud_detector = RealTimeFraudDetector(r, fraud_engine)
print("🚀 Real-time fraud detector initialized")

## 8. Demo: Process Sample Transactions

In [None]:
# Generate and process sample transactions
def run_fraud_detection_demo(num_transactions=20):
    print(f"🎯 Processing {num_transactions} sample transactions...\n")
    
    results = []
    fraud_count = 0
    total_processing_time = 0
    
    for i in range(num_transactions):
        # Select random user and merchant
        user = random.choice(data_gen.users)
        merchant = random.choice(data_gen.merchants)
        
        # Generate transaction amount (some intentionally suspicious)
        if random.random() < 0.2:  # 20% chance of suspicious amount
            amount = random.uniform(1000, 5000)  # High amount
        else:
            amount = random.uniform(10, 200)  # Normal amount
        
        # Process transaction
        result = fraud_detector.process_transaction(
            user['user_id'], 
            merchant['merchant_id'], 
            round(amount, 2)
        )
        
        results.append(result)
        total_processing_time += result['processing_time_ms']
        
        if result['is_fraud']:
            fraud_count += 1
            print(f"🚨 FRAUD DETECTED - Transaction {result['transaction_id']}")
            print(f"   Score: {result['fraud_score']:.3f}")
            print(f"   Reasons: {', '.join(result['reasons'])}")
            print(f"   Processing time: {result['processing_time_ms']:.2f}ms\n")
        else:
            print(f"✅ Transaction {result['transaction_id']} - Score: {result['fraud_score']:.3f} ({result['processing_time_ms']:.2f}ms)")
    
    # Summary statistics
    avg_processing_time = total_processing_time / num_transactions
    fraud_rate = (fraud_count / num_transactions) * 100
    
    print(f"\n📊 SUMMARY:")
    print(f"   Total transactions: {num_transactions}")
    print(f"   Fraud detected: {fraud_count} ({fraud_rate:.1f}%)")
    print(f"   Average processing time: {avg_processing_time:.2f}ms")
    print(f"   Throughput: {1000/avg_processing_time:.0f} transactions/second")
    
    return results

# Run the demo
demo_results = run_fraud_detection_demo(15)