<a href="https://colab.research.google.com/github/Coltcult/Buff/blob/main/XMR_Guardian_AI_(Conceptual).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import IsolationForest, RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score, precision_recall_fscore_support
from sklearn.preprocessing import StandardScaler
from collections import deque
import time
import hashlib
import logging
import threading

# ===============================
# Configuration
# ===============================

# Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Anomaly Detection Threshold
ANOMALY_THRESHOLD = 0.05  # Adjust as needed

# Data paths (replace with your actual data paths)
MALICIOUS_ADDRESSES_FILE = "malicious_addresses.csv"  # CSV with a 'address' column
EXCHANGE_KYC_DATA = "exchange_kyc_data.csv" # CSV with KYC data, including a 'user_id' and relevant fields
NETWORK_DATA_FILE = "network_transactions.csv" # CSV with network transaction data

# Feature names for transaction data - adjust based on your dataset
TRANSACTION_FEATURES = ['amount', 'timestamp', 'sender_balance', 'receiver_balance', 'time_since_last_transaction']

# ===============================
# Data Loading and Preprocessing
# ===============================

def load_malicious_addresses(filename=MALICIOUS_ADDRESSES_FILE):
    """Loads malicious addresses from a CSV file."""
    try:
        df = pd.read_csv(filename)
        if 'address' not in df.columns:
            raise ValueError("'address' column not found in malicious addresses file.")
        return set(df['address'])
    except Exception as e:
        logger.error(f"Error loading malicious addresses: {e}")
        return set()

def load_network_data(filename=NETWORK_DATA_FILE):
    """Loads network transaction data from a CSV file."""
    try:
        df = pd.read_csv(filename)
        # Basic data cleaning - adjust as necessary for your data
        for col in TRANSACTION_FEATURES:
            if col not in df.columns:
                raise ValueError(f"Column '{col}' not found in network data file.")
        df = df.dropna()
        df = df[TRANSACTION_FEATURES]  # Keep only relevant features
        return df
    except Exception as e:
        logger.error(f"Error loading network data: {e}")
        return pd.DataFrame()  # Return empty DataFrame on error

def load_exchange_kyc_data(filename=EXCHANGE_KYC_DATA):
    """Loads exchange KYC data from a CSV."""
    try:
        df = pd.read_csv(filename)
        #  basic cleaning
        df = df.dropna()
        return df
    except Exception as e:
        logger.error(f"Error loading exchange KYC data: {e}")
        return pd.DataFrame()

# ===============================
# Feature Engineering
# ===============================

def create_transaction_features(df):
    """
    Creates features from transaction data.  This is just an example;
    add more sophisticated feature engineering for a real-world application.
    """
    df['amount'] = df['amount'].astype(float)
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    # Example: Time since last transaction (for each address)
    df['time_since_last_transaction'] = df.groupby('sender')['timestamp'].diff().dt.total_seconds().fillna(0)
    df['hour_of_day'] = df['timestamp'].dt.hour
    df['day_of_week'] = df['timestamp'].dt.dayofweek
    return df

def create_kyc_features(df):
    """
    Creates features from KYC data.  This is a placeholder; you'd add
    features relevant to identity verification (e.g., consistency checks,
    flags for suspicious patterns).
    """
    # Example: Check for common first/last name
    common_names = ['John Smith', 'Jane Doe', 'Robert Jones']
    df['is_common_name'] = df['name'].apply(lambda x: x in common_names).astype(int)
    df['email_domain'] = df['email'].apply(lambda x: x.split('@')[1] if '@' in x else '')
    return df

# ===============================
# Anomaly Detection Model
# ===============================
class TransactionAnomalyDetector:
    """
    Detects anomalous transactions using Isolation Forest.
    """
    def __init__(self, threshold=ANOMALY_THRESHOLD):
        self.model = IsolationForest(contamination=threshold)
        self.threshold = threshold
        self.scaler = StandardScaler() # Scale the features

    def train(self, df):
        """Trains the anomaly detection model."""
        if df.empty:
            logger.warning("Training data is empty. Model not trained.")
            return
        try:
            # Scale the features *before* training
            scaled_data = self.scaler.fit_transform(df)
            self.model.fit(scaled_data)
            logger.info("Anomaly detection model trained.")
        except Exception as e:
            logger.error(f"Error training anomaly detection model: {e}")

    def predict(self, df):
        """Predicts anomalies in new data."""
        if not hasattr(self.model, 'estimators_'):
            logger.warning("Model not trained. Returning default anomaly scores.")
            return np.zeros(len(df))  # Return all zeros if not trained

        try:
            # Scale the data *before* making predictions
            scaled_data = self.scaler.transform(df)
            anomaly_scores = self.model.decision_function(scaled_data)
            return anomaly_scores
        except Exception as e:
            logger.error(f"Error predicting anomalies: {e}")
            return np.zeros(len(df)) # Return default

    def is_anomalous(self, score):
        """Check if a score is anomalous based on the threshold"""
        return score <= self.threshold
# ===============================
# Malicious Address Detection
# ===============================

class MaliciousAddressDetector:
    """
    Detects transactions involving known malicious addresses.
    """
    def __init__(self, malicious_addresses=None):
        self.malicious_addresses = malicious_addresses if malicious_addresses else set()

    def update_malicious_addresses(self, addresses):
        """Updates the set of malicious addresses."""
        if isinstance(addresses, set):
            self.malicious_addresses = addresses
        else:
            logger.warning("Addresses not a set.  Not updating.")

    def check_transaction(self, sender, receiver):
        """Checks if a transaction involves a malicious address."""
        if not self.malicious_addresses:
            return False, "No malicious addresses loaded."
        if sender in self.malicious_addresses or receiver in self.malicious_addresses:
            return True, "Transaction involves a malicious address."
        return False, "Transaction is not suspicious."

# ===============================
# KYC/AML Risk Assessment
# ===============================
class KYCRiskAssessor:
    """
    Assesses the risk associated with a user based on their KYC data.
    """
    def __init__(self):
        self.model = RandomForestClassifier()
        self.scaler = StandardScaler()
        self.trained = False

    def train(self, df):
        """Trains a model to predict risk (e.g., fraud, money laundering)."""
        if df.empty:
            logger.warning("KYC training data is empty. Model not trained.")
            return
        try:
            # Assuming 'risk_level' is the target variable (e.g., 0: low, 1: medium, 2: high)
            if 'risk_level' not in df.columns:
                raise ValueError("'risk_level' column not found in KYC data.")
            features = df.drop('risk_level', axis=1)
            features = create_kyc_features(features) # Create the features
            # Handle categorical variables:
            features = pd.get_dummies(features)
            scaled_features = self.scaler.fit_transform(features) # Scale
            labels = df['risk_level']
            X_train, X_test, y_train, y_test = train_test_split(scaled_features, labels, test_size=0.2, random_state=42)
            self.model.fit(X_train, y_train)
            self.trained = True
            logger.info("KYC risk assessment model trained.")

            # Evaluate the model (optional, but good practice)
            y_pred = self.model.predict(X_test)
            accuracy = accuracy_score(y_test, y_pred)
            report = classification_report(y_test, y_pred)
            precision, recall, f1, _ = precision_recall_fscore_support(y_test, y_pred, average='weighted')
            logger.info(f"KYC Model Evaluation - Accuracy: {accuracy:.2f}, Precision: {precision:.2f}, Recall: {recall:.2f}, F1: {f1:.2f}")
            logger.info(f"Classification Report:\n{report}")

        except Exception as e:
            logger.error(f"Error training KYC risk assessment model: {e}")

    def assess_risk(self, user_data):
        """Assesses the risk level of a given user."""
        if not self.trained:
            logger.warning("KYC model not trained. Returning default risk level (0).")
            return 0, "KYC model not trained"  # Default: 0 (low risk)

        try:
            #  validation
            if not isinstance(user_data, pd.DataFrame) or user_data.empty:
                logger.warning("Invalid user data. Returning default risk level (0).")
                return 0, "Invalid user data"

            user_features = create_kyc_features(user_data)
            user_features = pd.get_dummies(user_features)

            # Ensure the user_features has the same columns as the training data.
            train_cols = list(self.model.feature_names_in_)
            missing_cols = set(train_cols) - set(user_features.columns)
            for c in missing_cols:
                user_features[c] = 0
            # Ensure the order of columns is the same as in training
            user_features = user_features[train_cols]
            scaled_user_data = self.scaler.transform(user_features)
            risk_level = self.model.predict(scaled_user_data)[0]  # Get the prediction
            return risk_level, "Risk assessed"

        except Exception as e:
            logger.error(f"Error assessing KYC risk: {e}")
            return 0, f"Error: {e}"

# ===============================
# Main Class: XMR Guardian AI
# ===============================

class XMRGuardianAI:
    """
    Main class that integrates all the components.
    """
    def __init__(self,
                 malicious_addresses_file=MALICIOUS_ADDRESSES_FILE,
                 exchange_kyc_file=EXCHANGE_KYC_DATA,
                 network_data_file=NETWORK_DATA_FILE):
        self.malicious_address_detector = MaliciousAddressDetector()
        self.anomaly_detector = TransactionAnomalyDetector()
        self.kyc_risk_assessor = KYCRiskAssessor()
        self.transaction_history = deque(maxlen=1000)  # Store last 1000 transactions
        self.malicious_addresses_file = malicious_addresses_file
        self.exchange_kyc_file = exchange_kyc_file
        self.network_data_file = network_data_file
        self.running = True
        self.data_load_lock = threading.Lock()

        # Load data at startup
        self.load_data()

        # Start a background thread to periodically update data
        self.update_thread = threading.Thread(target=self.periodic_update)
        self.update_thread.daemon = True  # Thread exits when main program exits
        self.update_thread.start()

    def load_data(self):
        """Loads data (malicious addresses, KYC, network) with thread safety."""
        with self.data_load_lock:  # Acquire the lock
            start_time = time.time()
            malicious_addresses = load_malicious_addresses(self.malicious_addresses_file)
            kyc_data = load_exchange_kyc_data(self.exchange_kyc_file)
            network_data = load_network_data(self.network_data_file)  # Load network data

            self.malicious_address_detector.update_malicious_addresses(malicious_addresses)
            if not network_data.empty:
                network_data = create_transaction_features(network_data)
                self.anomaly_detector.train(network_data)
            if not kyc_data.empty:
                self.kyc_risk_assessor.train(kyc_data)

            end_time = time.time()
            logger.info(f"Data loaded in {end_time - start_time:.2f} seconds")

    def periodic_update(self):
        """Periodically updates data."""
        while self.running:
            time.sleep(3600)  # Update every hour (adjust as needed)
            self.load_data()

    def stop(self):
        """Stop the background update thread."""
        self.running = False
        self.update_thread.join()  # Wait for the thread to finish

    def process_transaction(self, transaction):
        """
        Processes a new transaction.
        Args:
            transaction (dict): A dictionary representing the transaction,
                including 'sender', 'receiver', 'amount', 'timestamp', and any
                other relevant features.
        """
        try:
            # 1. Basic validation
            if not all(key in transaction for key in ['sender', 'receiver', 'amount', 'timestamp']):
                raise ValueError("Invalid transaction format.")
            # 2. Check for malicious addresses
            is_malicious, reason = self.malicious_address_detector.check_transaction(
                transaction['sender'], transaction['receiver'])
            if is_malicious:
                logger.warning(f"Malicious transaction detected: {reason} - {transaction}")
                return "Malicious", reason

            # 3. Anomaly detection
            transaction_df = pd.DataFrame([transaction])
            transaction_df = create_transaction_features(transaction_df)
            anomaly_score = self.anomaly_detector.predict(transaction_df[TRANSACTION_FEATURES])[0]  # Predict using only the features
            if self.anomaly_detector.is_anomalous(anomaly_score):
                logger.warning(f"Anomalous transaction detected: score={anomaly_score:.3f} - {transaction}")
                return "Anomalous", f"Anomaly score: {anomaly_score:.3f}"

            # 4. Store the transaction (for potential future analysis)
            self.transaction_history.append(transaction)
            return "Normal", "Transaction processed"

        except ValueError as ve:
            logger.error(f"Error processing transaction: {ve} - {transaction}")
            return "Error", str(ve)
        except Exception as e:
            logger.error(f"Unexpected error processing transaction: {e} - {transaction}")
            return "Error", "Unexpected error"

    def assess_user_risk(self, user_data):
        """
        Assesses the risk associated with a user based on their KYC data.

        Args:
            user_data (pd.DataFrame): DataFrame containing the user's KYC information.
        """
        return self.kyc_risk_assessor.assess_risk(user_data)

    def get_status(self):
        """
        Returns the current status of the XMR Guardian AI.  Useful for monitoring.
        """
        status = {
            "malicious_addresses_loaded": len(self.malicious_address_detector.malicious_addresses),
            "anomaly_model_trained": hasattr(self.anomaly_detector.model, 'estimators_'),
            "kyc_model_trained": self.kyc_risk_assessor.trained,
            "transaction_history_size": len(self.transaction_history),
        }
        return status

# ===============================
# Example Usage
# ===============================

if __name__ == "__main__":
    # Initialize the XMR Guardian AI
    guardian_ai = XMRGuardianAI()

    # Example transaction (replace with real data)
    example_transaction = {
        'sender': 'address1',
        'receiver': 'address2',
        'amount': 1.234,
        'timestamp': '2024-07-24 12:00:00',
        'sender_balance': 100.0,
        'receiver_balance': 20.0,
    }

    # Process the transaction
    status, message = guardian_ai.process_transaction(example_transaction)
    print(f"Transaction Status: {status}, Message: {message}")

    # Example of a potentially anomalous transaction
    anomalous_transaction = {
        'sender': 'address3',
        'receiver': 'address4',
        'amount': 10000.0,  # Very large amount
        'timestamp': '2024-07-24 12:05:00',
        'sender_balance': 15000,
        'receiver_balance': 500,
    }
    status, message = guardian_ai.process_transaction(anomalous_transaction)
    print(f"Transaction Status: {status}, Message: {message}")

    # Example KYC data (replace with real data)
    example_kyc_data = pd.DataFrame([{
        'user_id': 'user123',
        'name': 'John Smith',
        'email': 'john.smith@example.com',
        'country': 'USA',
        'dob': '1990-05-15',
    }])
    risk_level, risk_message = guardian_ai.assess_user_risk(example_kyc_data)
    print(f"User Risk Level: {risk_level}, Message: {risk_message}")

    # Get the AI status
    print("Current Status:", guardian_ai.get_status())

    # Stop the background thread before exiting
    guardian_ai.stop()

ERROR:__main__:Error loading malicious addresses: [Errno 2] No such file or directory: 'malicious_addresses.csv'
ERROR:__main__:Error loading exchange KYC data: [Errno 2] No such file or directory: 'exchange_kyc_data.csv'
ERROR:__main__:Error loading network data: [Errno 2] No such file or directory: 'network_transactions.csv'


Transaction Status: Anomalous, Message: Anomaly score: 0.000
Transaction Status: Anomalous, Message: Anomaly score: 0.000
User Risk Level: 0, Message: KYC model not trained
Current Status: {'malicious_addresses_loaded': 0, 'anomaly_model_trained': False, 'kyc_model_trained': False, 'transaction_history_size': 0}
