# Dusting Attack & Address Poisoning Analysis Notebook

This notebook analyzes dusting attacks and address poisoning attempts on the Solana blockchain, particularly focusing on techniques documented by Chainalysis and other security researchers.

## Overview

### Dusting Attacks
Dusting attacks involve sending tiny amounts of cryptocurrency to many addresses to break privacy by linking these addresses together when the dust is moved.

### Address Poisoning
Address poisoning is a technique where attackers create addresses visually similar to a victim's frequent contacts, then send small transactions to make their address appear in the transaction history, hoping the victim will mistakenly send funds to the malicious address.

In [17]:
# Import required libraries
import os
import sys
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta

# Add project root to Python path
project_root = os.path.abspath(os.path.join(os.getcwd(), '../..'))
if project_root not in sys.path:
    sys.path.insert(0, project_root)

# Import SolanaGuard modules
from data_collection.collectors.helius_collector import HeliusCollector
from data_collection.collectors.range_collector import RangeCollector

# Try to import utils - similar approach as in mixer_detection notebook
try:
    from utils.address_utils import detect_dusting_and_poisoning
    print("Utils modules imported successfully")
except ImportError:
    try:
        from data_collection.utils.address_utils import detect_dusting_and_poisoning
        print("Utils modules imported from data_collection.utils")
    except ImportError:
        # Define placeholder function if module can't be found
        print("WARNING: Could not import address_utils module. Using placeholder functions.")
        
        def detect_dusting_and_poisoning(address, token_transfers, dusting_attacks, address_poisoning):
            """Placeholder function for detecting dusting and poisoning attacks."""
            return {
                "address": address,
                "dusting_detected": not dusting_attacks.empty if hasattr(dusting_attacks, 'empty') else bool(dusting_attacks),
                "poisoning_detected": not address_poisoning.empty if hasattr(address_poisoning, 'empty') else bool(address_poisoning),
                "dusting_attacks": dusting_attacks,
                "poisoning_attempts": address_poisoning
            }

# Configure plot style
plt.style.use('ggplot')
sns.set(style="whitegrid")

Utils modules imported from data_collection.utils


## Initialize API Collectors

First, we initialize the necessary API collectors to gather data from various sources:

In [18]:
# Initialize collectors
helius = HeliusCollector()
range_api = RangeCollector()

print("Collectors initialized successfully")

2025-04-24 23:33:06,385 - helius_collector - INFO - Initialized Helius collector
2025-04-24 23:33:06,387 - range_collector - INFO - Initialized Range collector
2025-04-24 23:33:06,387 - range_collector - INFO - Initialized Range collector


Collectors initialized successfully


## Identify Target Addresses

Let's identify addresses that might be targets of dusting attacks or address poisoning. We'll look at high-value wallets and frequent traders:

In [19]:
# Real Solana addresses to analyze for dusting attacks
target_addresses = [
    "Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD", # Prominent NFT collector address
    "dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH",  # High-value DeFi user
    "AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8"   # Exchange hot wallet
]

# Function to analyze an address for dusting and poisoning
def analyze_address_attacks(address):
    print(f"\nAnalyzing address: {address}")
    
    # Step 1: Get transaction history
    try:
        tx_history = helius.fetch_transaction_history(address, limit=200)
        print(f"Fetched {len(tx_history)} transactions")
    except Exception as e:
        print(f"Error fetching transaction history: {e}")
        tx_history = pd.DataFrame()
    
    # Step 2: Get token transfers
    try:
        token_transfers = helius.analyze_token_transfers(address, limit=200)
        print(f"Fetched {len(token_transfers)} token transfers")
    except Exception as e:
        print(f"Error analyzing token transfers: {e}")
        token_transfers = pd.DataFrame()
    
    # Step 3: Detect dusting attacks
    try:
        dusting_attacks = helius.detect_dusting_attacks(address)
        print(f"Detected {len(dusting_attacks)} potential dusting attacks")
    except Exception as e:
        print(f"Error detecting dusting attacks: {e}")
        dusting_attacks = pd.DataFrame()
    
    # Step 4: Detect address poisoning
    try:
        address_poisoning = helius.detect_address_poisoning(address)
        print(f"Detected {len(address_poisoning)} potential address poisoning attempts")
    except Exception as e:
        print(f"Error detecting address poisoning: {e}")
        address_poisoning = pd.DataFrame()
    
    results = {
        "address": address,
        "tx_history": tx_history,
        "token_transfers": token_transfers,
        "dusting_attacks": dusting_attacks,
        "address_poisoning": address_poisoning
    }
    
    return results

# Analyze each address
analysis_results = {}
for address in target_addresses:
    analysis_results[address] = analyze_address_attacks(address)

2025-04-24 23:33:06,406 - helius_collector - INFO - Fetching transaction history for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD (limit: 200)
2025-04-24 23:33:06,407 - helius_collector - INFO - Getting signatures for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD (limit: 100)
2025-04-24 23:33:06,407 - helius_collector - INFO - Getting signatures for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD (limit: 100)
2025-04-24 23:33:06,529 - helius_collector - ERROR - Failed to make RPC request: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None
2025-04-24 23:33:06,541 - helius_collector - INFO - Analyzing token transfers for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD
2025-04-24 23:33:06,542 - helius_collector - INFO - Fetching transaction history for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD (limit: 200)
2025-04-24 23:33:06,543 - helius_collector - INFO - Getting signatures for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD (limit: 100)
2025-04-24 23:33:06,52


Analyzing address: Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD
Error fetching transaction history: Request failed: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None


2025-04-24 23:33:06,656 - helius_collector - ERROR - Failed to make RPC request: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None
2025-04-24 23:33:06,658 - helius_collector - INFO - Detecting dusting attacks for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD (threshold: 0.1 USD)
2025-04-24 23:33:06,659 - helius_collector - INFO - Analyzing token transfers for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD
2025-04-24 23:33:06,661 - helius_collector - INFO - Fetching transaction history for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD (limit: 1000)
2025-04-24 23:33:06,662 - helius_collector - INFO - Getting signatures for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD (limit: 100)
2025-04-24 23:33:06,658 - helius_collector - INFO - Detecting dusting attacks for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD (threshold: 0.1 USD)
2025-04-24 23:33:06,659 - helius_collector - INFO - Analyzing token transfers for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD
2025-04

Error analyzing token transfers: Request failed: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None


2025-04-24 23:33:06,861 - helius_collector - INFO - Detecting address poisoning for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD
2025-04-24 23:33:06,862 - helius_collector - INFO - Fetching transaction history for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD (limit: 1000)
2025-04-24 23:33:06,863 - helius_collector - INFO - Getting signatures for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD (limit: 100)
2025-04-24 23:33:06,862 - helius_collector - INFO - Fetching transaction history for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD (limit: 1000)
2025-04-24 23:33:06,863 - helius_collector - INFO - Getting signatures for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD (limit: 100)
2025-04-24 23:33:07,052 - helius_collector - ERROR - Failed to make RPC request: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None
2025-04-24 23:33:07,052 - helius_collector - ERROR - Failed to make RPC request: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com

Error detecting dusting attacks: Request failed: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None


2025-04-24 23:33:07,054 - helius_collector - INFO - Fetching transaction history for dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH (limit: 200)
2025-04-24 23:33:07,055 - helius_collector - INFO - Getting signatures for dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH (limit: 100)
2025-04-24 23:33:07,055 - helius_collector - INFO - Getting signatures for dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH (limit: 100)
2025-04-24 23:33:07,258 - helius_collector - ERROR - Failed to make RPC request: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None
2025-04-24 23:33:07,258 - helius_collector - ERROR - Failed to make RPC request: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None


Error detecting address poisoning: Request failed: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None

Analyzing address: dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH


2025-04-24 23:33:07,260 - helius_collector - INFO - Analyzing token transfers for dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH
2025-04-24 23:33:07,261 - helius_collector - INFO - Fetching transaction history for dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH (limit: 200)
2025-04-24 23:33:07,261 - helius_collector - INFO - Getting signatures for dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH (limit: 100)
2025-04-24 23:33:07,261 - helius_collector - INFO - Fetching transaction history for dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH (limit: 200)
2025-04-24 23:33:07,261 - helius_collector - INFO - Getting signatures for dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH (limit: 100)


Error fetching transaction history: Request failed: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None


2025-04-24 23:33:07,481 - helius_collector - ERROR - Failed to make RPC request: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None
2025-04-24 23:33:07,484 - helius_collector - INFO - Detecting dusting attacks for dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH (threshold: 0.1 USD)
2025-04-24 23:33:07,485 - helius_collector - INFO - Analyzing token transfers for dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH
2025-04-24 23:33:07,486 - helius_collector - INFO - Fetching transaction history for dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH (limit: 1000)
2025-04-24 23:33:07,486 - helius_collector - INFO - Getting signatures for dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH (limit: 100)
2025-04-24 23:33:07,484 - helius_collector - INFO - Detecting dusting attacks for dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH (threshold: 0.1 USD)
2025-04-24 23:33:07,485 - helius_collector - INFO - Analyzing token transfers for dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH
2025-04-24 23

Error analyzing token transfers: Request failed: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None
Error detecting dusting attacks: Request failed: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None


2025-04-24 23:33:07,854 - helius_collector - ERROR - Failed to make RPC request: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None
2025-04-24 23:33:07,856 - helius_collector - INFO - Fetching transaction history for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8 (limit: 200)
2025-04-24 23:33:07,857 - helius_collector - INFO - Getting signatures for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8 (limit: 100)
2025-04-24 23:33:07,856 - helius_collector - INFO - Fetching transaction history for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8 (limit: 200)
2025-04-24 23:33:07,857 - helius_collector - INFO - Getting signatures for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8 (limit: 100)
2025-04-24 23:33:08,045 - helius_collector - ERROR - Failed to make RPC request: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None
2025-04-24 23:33:08,045 - helius_collector - ERROR - Failed to make RPC request: 401 Client Error: Unauthorized for url:

Error detecting address poisoning: Request failed: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None

Analyzing address: AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8


2025-04-24 23:33:08,046 - helius_collector - INFO - Analyzing token transfers for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8
2025-04-24 23:33:08,047 - helius_collector - INFO - Fetching transaction history for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8 (limit: 200)
2025-04-24 23:33:08,048 - helius_collector - INFO - Getting signatures for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8 (limit: 100)
2025-04-24 23:33:08,047 - helius_collector - INFO - Fetching transaction history for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8 (limit: 200)
2025-04-24 23:33:08,048 - helius_collector - INFO - Getting signatures for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8 (limit: 100)
2025-04-24 23:33:08,249 - helius_collector - ERROR - Failed to make RPC request: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None
2025-04-24 23:33:08,249 - helius_collector - ERROR - Failed to make RPC request: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key

Error fetching transaction history: Request failed: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None


2025-04-24 23:33:08,250 - helius_collector - INFO - Detecting dusting attacks for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8 (threshold: 0.1 USD)
2025-04-24 23:33:08,251 - helius_collector - INFO - Analyzing token transfers for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8
2025-04-24 23:33:08,253 - helius_collector - INFO - Fetching transaction history for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8 (limit: 1000)
2025-04-24 23:33:08,253 - helius_collector - INFO - Getting signatures for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8 (limit: 100)
2025-04-24 23:33:08,251 - helius_collector - INFO - Analyzing token transfers for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8
2025-04-24 23:33:08,253 - helius_collector - INFO - Fetching transaction history for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8 (limit: 1000)
2025-04-24 23:33:08,253 - helius_collector - INFO - Getting signatures for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8 (limit: 100)
2025-04-24 23:33:08,447 - helius_collector - ERROR 

Error analyzing token transfers: Request failed: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None


2025-04-24 23:33:08,449 - helius_collector - INFO - Detecting address poisoning for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8
2025-04-24 23:33:08,450 - helius_collector - INFO - Fetching transaction history for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8 (limit: 1000)
2025-04-24 23:33:08,451 - helius_collector - INFO - Getting signatures for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8 (limit: 100)
2025-04-24 23:33:08,450 - helius_collector - INFO - Fetching transaction history for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8 (limit: 1000)
2025-04-24 23:33:08,451 - helius_collector - INFO - Getting signatures for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8 (limit: 100)


Error detecting dusting attacks: Request failed: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None


2025-04-24 23:33:08,655 - helius_collector - ERROR - Failed to make RPC request: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None


Error detecting address poisoning: Request failed: 401 Client Error: Unauthorized for url: https://mainnet.helius-rpc.com/?api-key=None


## Analyze Dusting Attacks

Now, let's analyze the detected dusting attacks in more detail:

In [20]:
# Function to analyze dusting attacks
def analyze_dusting_patterns(address, dusting_data):
    if dusting_data.empty:
        print(f"No dusting attacks detected for {address}")
        return None
    
    print(f"\nAnalyzing dusting attacks for {address}:")
    print(f"Found {len(dusting_data)} suspicious dust transfers")
    
    # Analyze dust by token
    if "mint" in dusting_data.columns:
        dust_by_token = dusting_data["mint"].value_counts().reset_index()
        dust_by_token.columns = ["mint", "count"]
        
        print("\nDust by token:")
        for _, row in dust_by_token.iterrows():
            print(f"- {row['mint']}: {row['count']} dusting transactions")
    
    # Analyze dust amount distribution
    if "amount" in dusting_data.columns:
        print("\nDust amount statistics:")
        print(f"- Min amount: {dusting_data['amount'].min()}")
        print(f"- Max amount: {dusting_data['amount'].max()}")
        print(f"- Mean amount: {dusting_data['amount'].mean()}")
        
        # Plot dust amount distribution
        plt.figure(figsize=(10, 6))
        sns.histplot(dusting_data["amount"], kde=True)
        plt.title(f"Dust Amount Distribution for {address}")
        plt.xlabel("Amount")
        plt.ylabel("Frequency")
        plt.tight_layout()
        plt.show()
    
    # Analyze temporal patterns
    if "block_time" in dusting_data.columns:
        dusting_data["datetime"] = pd.to_datetime(dusting_data["block_time"], unit="s")
        dusting_data = dusting_data.sort_values("datetime")
        
        # Calculate intervals between dust transfers
        dusting_data["time_diff"] = dusting_data["datetime"].diff().dt.total_seconds()
        
        print("\nTemporal patterns:")
        print(f"- First dust: {dusting_data['datetime'].min()}")
        print(f"- Last dust: {dusting_data['datetime'].max()}")
        print(f"- Duration: {(dusting_data['datetime'].max() - dusting_data['datetime'].min()).total_seconds() / 3600:.2f} hours")
        
        # Check for regular intervals (possible automated dusting)
        if len(dusting_data) > 3:
            time_diffs = dusting_data["time_diff"].dropna()
            if len(time_diffs) > 0:
                mean_diff = time_diffs.mean()
                std_diff = time_diffs.std()
                cv = std_diff / mean_diff if mean_diff > 0 else 0
                
                print(f"- Mean interval: {mean_diff:.2f} seconds")
                print(f"- Interval std dev: {std_diff:.2f} seconds")
                print(f"- Coefficient of variation: {cv:.4f}")
                
                # Low CV suggests regular intervals (automated)
                if cv < 0.3 and len(time_diffs) > 5:
                    print("⚠️ Regular intervals detected - likely automated dusting attack")
        
        # Plot dust over time
        plt.figure(figsize=(12, 6))
        plt.scatter(dusting_data["datetime"], dusting_data["amount"], alpha=0.7)
        plt.title(f"Dust Transfers Over Time for {address}")
        plt.xlabel("Date")
        plt.ylabel("Amount")
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()
    
    return {
        "address": address,
        "dust_count": len(dusting_data),
        "dust_by_token": dust_by_token.to_dict("records") if "mint" in dusting_data.columns else [],
        "amount_stats": {
            "min": dusting_data["amount"].min() if "amount" in dusting_data.columns else 0,
            "max": dusting_data["amount"].max() if "amount" in dusting_data.columns else 0,
            "mean": dusting_data["amount"].mean() if "amount" in dusting_data.columns else 0
        },
        "temporal_stats": {
            "first_dust": dusting_data["datetime"].min() if "block_time" in dusting_data.columns else None,
            "last_dust": dusting_data["datetime"].max() if "block_time" in dusting_data.columns else None,
            "is_automated": (cv < 0.3 and len(time_diffs) > 5) if "block_time" in dusting_data.columns and len(dusting_data) > 3 else False
        } if "block_time" in dusting_data.columns else {}
    }

# Analyze dusting patterns for each address
dusting_analyses = {}
for address, data in analysis_results.items():
    dusting_analyses[address] = analyze_dusting_patterns(address, data["dusting_attacks"])

No dusting attacks detected for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD
No dusting attacks detected for dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH
No dusting attacks detected for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8


## Analyze Address Poisoning

Now, let's analyze the detected address poisoning attempts:

In [21]:
# Function to analyze address poisoning
def analyze_poisoning_patterns(address, poisoning_data):
    if poisoning_data.empty:
        print(f"No address poisoning detected for {address}")
        return None
    
    print(f"\nAnalyzing address poisoning for {address}:")
    print(f"Found {len(poisoning_data)} suspicious look-alike addresses")
    
    # Analyze similarity scores
    if "similarity_score" in poisoning_data.columns:
        print("\nSimilarity score statistics:")
        print(f"- Min similarity: {poisoning_data['similarity_score'].min():.2f}")
        print(f"- Max similarity: {poisoning_data['similarity_score'].max():.2f}")
        print(f"- Mean similarity: {poisoning_data['similarity_score'].mean():.2f}")
        
        # Plot similarity score distribution
        plt.figure(figsize=(10, 6))
        sns.histplot(poisoning_data["similarity_score"], kde=True)
        plt.title(f"Address Similarity Score Distribution for {address}")
        plt.xlabel("Similarity Score")
        plt.ylabel("Frequency")
        plt.tight_layout()
        plt.show()
    
    # List similar addresses and their similarity scores
    if "similar_address" in poisoning_data.columns and "similarity_score" in poisoning_data.columns:
        poisoning_data = poisoning_data.sort_values("similarity_score", ascending=False)
        
        print("\nTop similar addresses:")
        for i, (_, row) in enumerate(poisoning_data.head(10).iterrows()):
            print(f"- {row['similar_address']} (similarity: {row['similarity_score']:.2f})")
            
            # Display address comparison
            print(f"  Original: {address}")
            print(f"  Similar:  {row['similar_address']}")
            
            # Visualize character differences
            indicator = ""
            for j in range(min(len(address), len(row['similar_address']))):
                if address[j] == row['similar_address'][j]:
                    indicator += " "
                else:
                    indicator += "^"
            print(f"           {indicator}")
    
    # Analyze temporal patterns
    if "block_time" in poisoning_data.columns:
        poisoning_data["datetime"] = pd.to_datetime(poisoning_data["block_time"], unit="s")
        poisoning_data = poisoning_data.sort_values("datetime")
        
        print("\nTemporal patterns:")
        print(f"- First poisoning attempt: {poisoning_data['datetime'].min()}")
        print(f"- Last poisoning attempt: {poisoning_data['datetime'].max()}")
        print(f"- Duration: {(poisoning_data['datetime'].max() - poisoning_data['datetime'].min()).total_seconds() / 3600:.2f} hours")
        
        # Plot poisoning attempts over time
        plt.figure(figsize=(12, 6))
        plt.scatter(poisoning_data["datetime"], poisoning_data["similarity_score"] if "similarity_score" in poisoning_data.columns else [1] * len(poisoning_data), alpha=0.7)
        plt.title(f"Address Poisoning Attempts Over Time for {address}")
        plt.xlabel("Date")
        plt.ylabel("Similarity Score")
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()
    
    return {
        "address": address,
        "poisoning_count": len(poisoning_data),
        "similar_addresses": poisoning_data["similar_address"].tolist() if "similar_address" in poisoning_data.columns else [],
        "similarity_stats": {
            "min": poisoning_data["similarity_score"].min() if "similarity_score" in poisoning_data.columns else 0,
            "max": poisoning_data["similarity_score"].max() if "similarity_score" in poisoning_data.columns else 0,
            "mean": poisoning_data["similarity_score"].mean() if "similarity_score" in poisoning_data.columns else 0
        } if "similarity_score" in poisoning_data.columns else {},
        "temporal_stats": {
            "first_attempt": poisoning_data["datetime"].min() if "block_time" in poisoning_data.columns else None,
            "last_attempt": poisoning_data["datetime"].max() if "block_time" in poisoning_data.columns else None
        } if "block_time" in poisoning_data.columns else {}
    }

# Analyze poisoning patterns for each address
poisoning_analyses = {}
for address, data in analysis_results.items():
    poisoning_analyses[address] = analyze_poisoning_patterns(address, data["address_poisoning"])

No address poisoning detected for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD
No address poisoning detected for dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH
No address poisoning detected for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8


## Combine Detection Results

Let's combine our dusting and poisoning detection results for a comprehensive analysis:

In [22]:
# Combine dusting and poisoning detection results
for address, data in analysis_results.items():
    # Use the utility function to combine detection results
    combined_analysis = detect_dusting_and_poisoning(
        address,
        data["token_transfers"],
        data["dusting_attacks"],
        data["address_poisoning"]
    )
    
    print(f"\nCombined analysis for {address}:")
    print(f"- Dusting detected: {combined_analysis['dusting_detected']}")
    print(f"- Poisoning detected: {combined_analysis['poisoning_detected']}")
    
    # Show detailed attack information
    if combined_analysis["dusting_detected"]:
        print("\nDusting attacks:")
        for attack in combined_analysis["dusting_attacks"][:3]:  # Show top 3
            if isinstance(attack, dict) and "type" in attack:
                print(f"- {attack['type']}: {attack.get('count', 0)} instances")
            else:
                print(f"- Attack details: {attack}")
    
    if combined_analysis["poisoning_detected"]:
        print("\nPoisoning attempts:")
        for attempt in combined_analysis["poisoning_attempts"][:3]:  # Show top 3
            if isinstance(attempt, dict) and "similar_address" in attempt:
                print(f"- Similar address: {attempt['similar_address']} (score: {attempt.get('similarity_score', 0):.2f})")
            else:
                print(f"- Attempt details: {attempt}")


Combined analysis for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD:
- Dusting detected: False
- Poisoning detected: False

Combined analysis for dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH:
- Dusting detected: False
- Poisoning detected: False

Combined analysis for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8:
- Dusting detected: False
- Poisoning detected: False


## Identify Attack Patterns

Let's identify common patterns across the attacks we've detected:

In [23]:
# Identify common attack patterns
def identify_attack_patterns():
    # Combine all dusting attacks
    all_dusting = []
    for address, data in analysis_results.items():
        if not data["dusting_attacks"].empty:
            dusting = data["dusting_attacks"].copy()
            dusting["target_address"] = address
            all_dusting.append(dusting)
    
    dusting_df = pd.concat(all_dusting) if all_dusting else pd.DataFrame()
    
    # Combine all poisoning attempts
    all_poisoning = []
    for address, data in analysis_results.items():
        if not data["address_poisoning"].empty:
            poisoning = data["address_poisoning"].copy()
            poisoning["target_address"] = address
            all_poisoning.append(poisoning)
    
    poisoning_df = pd.concat(all_poisoning) if all_poisoning else pd.DataFrame()
    
    # Analyze common patterns
    patterns = {
        "dusting": {
            "total_count": len(dusting_df),
            "targets_affected": dusting_df["target_address"].nunique() if not dusting_df.empty else 0,
            "patterns": []
        },
        "poisoning": {
            "total_count": len(poisoning_df),
            "targets_affected": poisoning_df["target_address"].nunique() if not poisoning_df.empty else 0,
            "patterns": []
        }
    }
    
    # Identify dusting patterns
    if not dusting_df.empty and "mint" in dusting_df.columns:
        # Common tokens used for dusting
        common_dust_tokens = dusting_df["mint"].value_counts().reset_index()
        common_dust_tokens.columns = ["mint", "count"]
        
        patterns["dusting"]["common_tokens"] = common_dust_tokens.to_dict("records")
        
        # Typical dust amounts
        if "amount" in dusting_df.columns:
            patterns["dusting"]["amount_stats"] = {
                "min": dusting_df["amount"].min(),
                "max": dusting_df["amount"].max(),
                "mean": dusting_df["amount"].mean()
            }
            
            # Calculate common dust amounts (cluster analysis)
            from sklearn.cluster import KMeans
            
            amounts = dusting_df["amount"].values.reshape(-1, 1)
            if len(amounts) > 5:  # Need enough samples for clustering
                kmeans = KMeans(n_clusters=min(5, len(amounts)), random_state=0).fit(amounts)
                
                # Get cluster centers and counts
                centers = kmeans.cluster_centers_.flatten()
                labels = kmeans.labels_
                cluster_counts = np.bincount(labels)
                
                clusters = []
                for i in range(len(centers)):
                    clusters.append({
                        "amount": centers[i],
                        "count": int(cluster_counts[i])
                    })
                
                patterns["dusting"]["amount_clusters"] = clusters
        
        # Check for coordinated attacks (same block time)
        if "block_time" in dusting_df.columns:
            block_time_counts = dusting_df["block_time"].value_counts()
            coordinated_times = block_time_counts[block_time_counts > 1].index.tolist()
            
            if coordinated_times:
                coordinated_attacks = []
                
                for time in coordinated_times:
                    attacks = dusting_df[dusting_df["block_time"] == time]
                    
                    coordinated_attacks.append({
                        "time": pd.to_datetime(time, unit="s"),
                        "target_count": attacks["target_address"].nunique(),
                        "attack_count": len(attacks)
                    })
                
                patterns["dusting"]["coordinated_attacks"] = coordinated_attacks
    
    # Identify poisoning patterns
    if not poisoning_df.empty and "similar_address" in poisoning_df.columns:
        # Check for reused poisoning addresses
        address_counts = poisoning_df["similar_address"].value_counts().reset_index()
        address_counts.columns = ["address", "count"]
        
        reused_addresses = address_counts[address_counts["count"] > 1]
        patterns["poisoning"]["reused_addresses"] = reused_addresses.to_dict("records")
        
        # Check for temporal patterns
        if "block_time" in poisoning_df.columns:
            poisoning_df["datetime"] = pd.to_datetime(poisoning_df["block_time"], unit="s")
            poisoning_df = poisoning_df.sort_values("datetime")
            
            # Group attacks by date
            poisoning_df["date"] = poisoning_df["datetime"].dt.date
            daily_attacks = poisoning_df.groupby("date").size().reset_index()
            daily_attacks.columns = ["date", "count"]
            
            patterns["poisoning"]["daily_attacks"] = daily_attacks.to_dict("records")
            
            # Check for coordinated attacks (same block time)
            block_time_counts = poisoning_df["block_time"].value_counts()
            coordinated_times = block_time_counts[block_time_counts > 1].index.tolist()
            
            if coordinated_times:
                coordinated_attacks = []
                
                for time in coordinated_times:
                    attacks = poisoning_df[poisoning_df["block_time"] == time]
                    
                    coordinated_attacks.append({
                        "time": pd.to_datetime(time, unit="s"),
                        "target_count": attacks["target_address"].nunique(),
                        "attack_count": len(attacks)
                    })
                
                patterns["poisoning"]["coordinated_attacks"] = coordinated_attacks
    
    return patterns

# Identify attack patterns
attack_patterns = identify_attack_patterns()

print("\nOverall Attack Patterns:")
print(f"Dusting: {attack_patterns['dusting']['total_count']} attacks affecting {attack_patterns['dusting']['targets_affected']} addresses")
print(f"Poisoning: {attack_patterns['poisoning']['total_count']} attempts affecting {attack_patterns['poisoning']['targets_affected']} addresses")

# Display common tokens used for dusting
if "common_tokens" in attack_patterns["dusting"]:
    print("\nCommon tokens used for dusting:")
    for token in attack_patterns["dusting"]["common_tokens"][:5]:  # Show top 5
        print(f"- {token['mint']}: {token['count']} instances")

# Display coordinated dusting attacks
if "coordinated_attacks" in attack_patterns["dusting"]:
    print("\nCoordinated dusting attacks:")
    for attack in attack_patterns["dusting"]["coordinated_attacks"][:3]:  # Show top 3
        print(f"- {attack['time']}: {attack['attack_count']} attacks targeting {attack['target_count']} addresses")

# Display reused poisoning addresses
if "reused_addresses" in attack_patterns["poisoning"]:
    print("\nReused poisoning addresses:")
    for addr in attack_patterns["poisoning"]["reused_addresses"][:5]:  # Show top 5
        print(f"- {addr['address']}: used in {addr['count']} poisoning attempts")


Overall Attack Patterns:
Dusting: 0 attacks affecting 0 addresses
Poisoning: 0 attempts affecting 0 addresses


## Generate Recommendations

Based on our analysis, let's generate recommendations for users to protect against dusting and poisoning:

In [24]:
# Generate defense recommendations
def generate_defense_recommendations(address, dusting_data, poisoning_data):
    recommendations = {
        "address": address,
        "dusting_recommendations": [],
        "poisoning_recommendations": []
    }
    
    # Dusting attack recommendations
    if dusting_data and dusting_data.get("dust_count", 0) > 0:
        recommendations["dusting_recommendations"] = [
            "Do not move dust tokens - leave them untouched to prevent address linkage",
            "Consider using a dedicated wallet for receiving dust, separate from your main funds",
            "Use a wallet that allows you to 'ignore' or 'hide' small token balances"
        ]
        
        # Add token-specific recommendations
        tokens = dusting_data.get("dust_by_token", [])
        if tokens:
            for token in tokens[:3]:  # Top 3 dusting tokens
                recommendations["dusting_recommendations"].append(
                    f"Be especially cautious with {token['mint']} tokens, which have been used in {token['count']} dusting attempts"
                )
    
    # Address poisoning recommendations
    if poisoning_data and poisoning_data.get("poisoning_count", 0) > 0:
        recommendations["poisoning_recommendations"] = [
            "Always copy and paste addresses instead of typing them manually",
            "Verify the entire address, not just the beginning and end",
            "Use the Solana address book feature to save trusted addresses",
            "Consider using a hardware wallet with display confirmation",
            "Enable transaction previews in your wallet to verify recipients"
        ]
        
        # Add specific warning about similar addresses
        similar_addresses = poisoning_data.get("similar_addresses", [])
        if similar_addresses:
            for i, addr in enumerate(similar_addresses[:3]):  # Top 3 similar addresses
                recommendations["poisoning_recommendations"].append(
                    f"Be extremely cautious with address {addr} which resembles your address"
                )
    
    return recommendations

# Generate recommendations for each address
defense_recommendations = {}
for address in target_addresses:
    defense_recommendations[address] = generate_defense_recommendations(
        address,
        dusting_analyses.get(address),
        poisoning_analyses.get(address)
    )
    
    print(f"\nRecommendations for {address}:")
    
    if defense_recommendations[address]["dusting_recommendations"]:
        print("\nDusting Attack Defense:")
        for i, rec in enumerate(defense_recommendations[address]["dusting_recommendations"], 1):
            print(f"{i}. {rec}")
    
    if defense_recommendations[address]["poisoning_recommendations"]:
        print("\nAddress Poisoning Defense:")
        for i, rec in enumerate(defense_recommendations[address]["poisoning_recommendations"], 1):
            print(f"{i}. {rec}")


Recommendations for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD:

Recommendations for dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH:

Recommendations for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8:


## Conclusions and Report Generation

Let's summarize our findings and generate a comprehensive report:

In [25]:
# Generate summary report
report = ["# Dusting Attack & Address Poisoning Analysis Report\n"]
report.append(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")

report.append("## Addresses Analyzed\n")
for address in target_addresses:
    dusting_count = dusting_analyses.get(address, {}).get("dust_count", 0) if dusting_analyses.get(address) else 0
    poisoning_count = poisoning_analyses.get(address, {}).get("poisoning_count", 0) if poisoning_analyses.get(address) else 0
    
    report.append(f"- **{address}**: {dusting_count} dusting attacks, {poisoning_count} poisoning attempts")

report.append("\n## Attack Summary\n")
report.append(f"- **Total Dusting Attacks**: {attack_patterns['dusting']['total_count']}")
report.append(f"- **Total Poisoning Attempts**: {attack_patterns['poisoning']['total_count']}")
report.append(f"- **Addresses Affected by Dusting**: {attack_patterns['dusting']['targets_affected']}")
report.append(f"- **Addresses Affected by Poisoning**: {attack_patterns['poisoning']['targets_affected']}\n")

# Add common attack patterns
if "common_tokens" in attack_patterns["dusting"]:
    report.append("### Common Dusting Tokens\n")
    for token in attack_patterns["dusting"]["common_tokens"][:5]:  # Top 5
        report.append(f"- **{token['mint']}**: {token['count']} instances")
    report.append("")

if "coordinated_attacks" in attack_patterns["dusting"]:
    report.append("### Coordinated Dusting Attacks\n")
    for attack in attack_patterns["dusting"]["coordinated_attacks"][:3]:  # Top 3
        report.append(f"- **{attack['time']}**: {attack['attack_count']} attacks targeting {attack['target_count']} addresses")
    report.append("")

if "reused_addresses" in attack_patterns["poisoning"]:
    report.append("### Common Poisoning Addresses\n")
    for addr in attack_patterns["poisoning"]["reused_addresses"][:5]:  # Top 5
        report.append(f"- **{addr['address']}**: used in {addr['count']} poisoning attempts")
    report.append("")

# Detailed analysis per address
for address in target_addresses:
    report.append(f"\n## Detailed Analysis for {address}\n")
    
    # Dusting analysis
    dusting_data = dusting_analyses.get(address)
    if dusting_data:
        report.append(f"### Dusting Attack Analysis\n")
        report.append(f"- **Number of attacks**: {dusting_data.get('dust_count', 0)}")
        
        # Add amount statistics
        amount_stats = dusting_data.get("amount_stats", {})
        if amount_stats:
            report.append(f"- **Minimum amount**: {amount_stats.get('min', 0)}")
            report.append(f"- **Maximum amount**: {amount_stats.get('max', 0)}")
            report.append(f"- **Average amount**: {amount_stats.get('mean', 0)}")
        
        # Add temporal statistics
        temporal_stats = dusting_data.get("temporal_stats", {})
        if temporal_stats:
            first_dust = temporal_stats.get("first_dust")
            last_dust = temporal_stats.get("last_dust")
            is_automated = temporal_stats.get("is_automated", False)
            
            if first_dust and last_dust:
                report.append(f"- **First dust**: {first_dust}")
                report.append(f"- **Last dust**: {last_dust}")
            
            if is_automated:
                report.append(f"- **Pattern**: Likely automated dusting attack (regular intervals)")
    else:
        report.append(f"### Dusting Attack Analysis\n")
        report.append("No dusting attacks detected.")
    
    # Poisoning analysis
    poisoning_data = poisoning_analyses.get(address)
    if poisoning_data:
        report.append(f"\n### Address Poisoning Analysis\n")
        report.append(f"- **Number of attempts**: {poisoning_data.get('poisoning_count', 0)}")
        
        # Add similarity statistics
        similarity_stats = poisoning_data.get("similarity_stats", {})
        if similarity_stats:
            report.append(f"- **Minimum similarity**: {similarity_stats.get('min', 0):.2f}")
            report.append(f"- **Maximum similarity**: {similarity_stats.get('max', 0):.2f}")
            report.append(f"- **Average similarity**: {similarity_stats.get('mean', 0):.2f}")
        
        # Add similar addresses
        similar_addresses = poisoning_data.get("similar_addresses", [])
        if similar_addresses:
            report.append("\n#### Similar Addresses\n")
            for i, addr in enumerate(similar_addresses[:5], 1):  # Top 5
                report.append(f"{i}. `{addr}`")
    else:
        report.append(f"\n### Address Poisoning Analysis\n")
        report.append("No address poisoning attempts detected.")
    
    # Recommendations
    recommendations = defense_recommendations.get(address, {})
    if recommendations:
        report.append(f"\n### Defense Recommendations\n")
        
        dusting_recs = recommendations.get("dusting_recommendations", [])
        if dusting_recs:
            report.append("#### Dusting Attack Defense\n")
            for rec in dusting_recs:
                report.append(f"- {rec}")
            report.append("")
        
        poisoning_recs = recommendations.get("poisoning_recommendations", [])
        if poisoning_recs:
            report.append("#### Address Poisoning Defense\n")
            for rec in poisoning_recs:
                report.append(f"- {rec}")

# Add general recommendations
report.append("\n## General Recommendations\n")
report.append("### Dusting Attack Prevention\n")
report.append("- Use wallets that allow you to ignore or hide small token balances")
report.append("- Consider using different wallets for different purposes (cold storage, trading, etc.)")
report.append("- Monitor your wallets for unexpected token transfers")
report.append("- Be cautious when interacting with unknown tokens or airdrops")
report.append("- Consider using privacy-enhancing tools like Phantom's 'Burn Token' feature\n")

report.append("### Address Poisoning Prevention\n")
report.append("- Always copy and paste addresses, never type them manually")
report.append("- Verify the entire address, not just the beginning and end")
report.append("- Use address book features in wallets to save trusted addresses")
report.append("- Consider using a hardware wallet with visual verification")
report.append("- Use Solana Name Service (SNS) or similar domain services for frequent transactions")
report.append("- Enable transaction previews in your wallet to verify recipients")
report.append("- Double-check addresses before confirming transactions, especially high-value ones")

# Save report to file
report_path = f"../../reports/dusting_poisoning_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
os.makedirs("../../reports", exist_ok=True)

with open(report_path, "w") as f:
    f.write("\n".join(report))

print(f"\nReport generated and saved to {report_path}")

# Display report in notebook
from IPython.display import Markdown
display(Markdown("\n".join(report)))


Report generated and saved to ../../reports/dusting_poisoning_report_20250424_233309.md


# Dusting Attack & Address Poisoning Analysis Report

Generated on: 2025-04-24 23:33:09

## Addresses Analyzed

- **Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD**: 0 dusting attacks, 0 poisoning attempts
- **dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH**: 0 dusting attacks, 0 poisoning attempts
- **AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8**: 0 dusting attacks, 0 poisoning attempts

## Attack Summary

- **Total Dusting Attacks**: 0
- **Total Poisoning Attempts**: 0
- **Addresses Affected by Dusting**: 0
- **Addresses Affected by Poisoning**: 0


## Detailed Analysis for Gnt27xtC473ZT2Mw5u8wZ68Z3gULkSTb5DuxJy7eJotD

### Dusting Attack Analysis

No dusting attacks detected.

### Address Poisoning Analysis

No address poisoning attempts detected.

### Defense Recommendations


## Detailed Analysis for dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH

### Dusting Attack Analysis

No dusting attacks detected.

### Address Poisoning Analysis

No address poisoning attempts detected.

### Defense Recommendations


## Detailed Analysis for AAzeEnHVAZzMNZN2xLWfSbo8RgZZ9INENT2VUE8AnX8

### Dusting Attack Analysis

No dusting attacks detected.

### Address Poisoning Analysis

No address poisoning attempts detected.

### Defense Recommendations


## General Recommendations

### Dusting Attack Prevention

- Use wallets that allow you to ignore or hide small token balances
- Consider using different wallets for different purposes (cold storage, trading, etc.)
- Monitor your wallets for unexpected token transfers
- Be cautious when interacting with unknown tokens or airdrops
- Consider using privacy-enhancing tools like Phantom's 'Burn Token' feature

### Address Poisoning Prevention

- Always copy and paste addresses, never type them manually
- Verify the entire address, not just the beginning and end
- Use address book features in wallets to save trusted addresses
- Consider using a hardware wallet with visual verification
- Use Solana Name Service (SNS) or similar domain services for frequent transactions
- Enable transaction previews in your wallet to verify recipients
- Double-check addresses before confirming transactions, especially high-value ones