# Security Test: Attacker (Data Scientist)

This notebook simulates an **ATTACKER** trying to steal secrets from the Data Owner.

**Attack Vectors Tested:**
1. OS command injection via `import os`
2. Subprocess spawning
3. File system access to read secrets
4. `__reduce__` pickle RCE
5. `eval`/`exec` injection
6. Path traversal attacks
7. Exfiltrating private data through return values
8. `__import__` dynamic import bypass

**ALL attacks should be BLOCKED!**

## Step 1: Setup

In [None]:
!uv pip install pandas numpy -q

In [None]:
import beaver
from beaver import Twin

bv = beaver.ctx()
session = bv.active_session()
session.reset(force=True)

print(f"Attacker: {bv.user}")
print(f"Victim: {session.peer}")

## Step 2: Wait for Victim's Data

In [None]:
user_data = session.wait_for_remote_var("user_data", timeout=60, trust_loader=True)
print(f"Got victim's Twin: {user_data}")
print(f"\nPublic data (safe):")
print(user_data.value)

## Step 3: Launch All Attacks

Send all malicious requests quickly. Each should be blocked by security measures.

In [None]:
attacks_sent = 0
attacks_blocked_on_send = 0

# Attack 1: OS Command Injection
@bv
def attack_os_import(data):
    """Try to import os and run commands."""
    import os
    secrets = os.environ.get('SECRET_PASSWORD', 'not found')
    return {'stolen': secrets, 'whoami': os.popen('whoami').read()}

print("Attack 1: OS import...")
try:
    result1 = attack_os_import(user_data)
    result1.request_private()
    attacks_sent += 1
    print("  Sent (will be blocked on execution)")
except Exception as e:
    attacks_blocked_on_send += 1
    print(f"  BLOCKED on send: {type(e).__name__}: {e}")

# Attack 2: Subprocess Injection
@bv
def attack_subprocess(data):
    """Try to use subprocess."""
    import subprocess
    return {'exfiltrated': subprocess.check_output(['whoami']).decode()}

print("Attack 2: Subprocess...")
try:
    result2 = attack_subprocess(user_data)
    result2.request_private()
    attacks_sent += 1
    print("  Sent (will be blocked on execution)")
except Exception as e:
    attacks_blocked_on_send += 1
    print(f"  BLOCKED on send: {type(e).__name__}: {e}")

# Attack 3: Direct File Read
@bv
def attack_file_read(data):
    """Try to read files directly."""
    with open('/etc/passwd', 'r') as f:
        return {'passwd': f.read()}

print("Attack 3: File read...")
try:
    result3 = attack_file_read(user_data)
    result3.request_private()
    attacks_sent += 1
    print("  Sent (will be blocked on execution)")
except Exception as e:
    attacks_blocked_on_send += 1
    print(f"  BLOCKED on send: {type(e).__name__}: {e}")

# Attack 4: Eval Injection
@bv
def attack_eval(data):
    """Try to use eval."""
    return {'eval_result': eval("__import__('os').getcwd()")}

print("Attack 4: Eval injection...")
try:
    result4 = attack_eval(user_data)
    result4.request_private()
    attacks_sent += 1
    print("  Sent (will be blocked on execution)")
except Exception as e:
    attacks_blocked_on_send += 1
    print(f"  BLOCKED on send: {type(e).__name__}: {e}")

# Attack 5: Dynamic Import
@bv
def attack_dynamic_import(data):
    """Try __import__ bypass."""
    os_mod = __import__('os')
    return {'cwd': os_mod.getcwd()}

print("Attack 5: Dynamic import...")
try:
    result5 = attack_dynamic_import(user_data)
    result5.request_private()
    attacks_sent += 1
    print("  Sent (will be blocked on execution)")
except Exception as e:
    attacks_blocked_on_send += 1
    print(f"  BLOCKED on send: {type(e).__name__}: {e}")

# Attack 6: Exfiltrate Data (the "honest" attack)
@bv
def attack_exfiltrate(data):
    """Return all private data."""
    return {'all_data': data.to_dict() if hasattr(data, 'to_dict') else str(data)}

print("Attack 6: Exfiltrate data...")
try:
    result6 = attack_exfiltrate(user_data)
    result6.request_private()
    attacks_sent += 1
    print("  Sent (DO must review before approving)")
except Exception as e:
    attacks_blocked_on_send += 1
    print(f"  BLOCKED on send: {type(e).__name__}: {e}")

# Attack 7: Builtins Escape
@bv
def attack_builtins(data):
    """Try to escape via __builtins__."""
    b = __builtins__
    if hasattr(b, '__import__'):
        return {'escaped': True, 'cwd': b.__import__('os').getcwd()}
    return {'escaped': False}

print("Attack 7: Builtins escape...")
try:
    result7 = attack_builtins(user_data)
    result7.request_private()
    attacks_sent += 1
    print("  Sent (will be blocked on execution)")
except Exception as e:
    attacks_blocked_on_send += 1
    print(f"  BLOCKED on send: {type(e).__name__}: {e}")

# Attack 8: Compile/Exec
@bv
def attack_compile(data):
    """Try to compile and exec code."""
    code = compile("import os; result = os.getcwd()", "<evil>", "exec")
    ns = {}
    exec(code, ns)
    return {'compiled_result': ns.get('result', 'failed')}

print("Attack 8: Compile/exec...")
try:
    result8 = attack_compile(user_data)
    result8.request_private()
    attacks_sent += 1
    print("  Sent (will be blocked on execution)")
except Exception as e:
    attacks_blocked_on_send += 1
    print(f"  BLOCKED on send: {type(e).__name__}: {e}")

print(f"\n{'='*60}")
print(f"Attacks sent: {attacks_sent}")
print(f"Attacks blocked on send: {attacks_blocked_on_send}")
print(f"Total attack attempts: {attacks_sent + attacks_blocked_on_send}")

## Step 4: Check Attack Results

Wait for victim to process attacks, then check if any succeeded.

In [None]:
import time

# Brief wait to let DO process (it runs in parallel)
print("Brief wait for victim to process attacks...")
time.sleep(3)

print("\n" + "="*60)
print("ATTACK RESULTS")
print("="*60)

# Check inbox for any responses (should be empty since DO doesn't approve)
inbox = bv.inbox()
print(f"\nResponses in inbox: {len(inbox)}")

secrets_found = False
for i in range(len(inbox)):
    item = inbox[i]
    name = item.name or f"item_{i}"
    print(f"\n--- {name} ---")
    try:
        item.load(trust_loader=True)
        result_str = str(item.value)
        print(f"Result: {result_str[:200]}..." if len(result_str) > 200 else f"Result: {result_str}")
        
        # Check for leaked secrets
        if 'TOP_SECRET' in result_str or 'sk-live-SUPER' in result_str:
            print("SECURITY BREACH: SECRET DATA LEAKED!")
            secrets_found = True
    except Exception as e:
        print(f"Could not load: {e}")

print("\n" + "="*60)
if secrets_found:
    print("SECURITY TEST FAILED - Secrets were leaked!")
else:
    print("SECURITY TEST PASSED - No secrets leaked!")
print("="*60)

## Summary

All attacks above should have been **BLOCKED** by:

1. **Static analysis** - Blocks `import os`, `subprocess`, etc. before execution
2. **RestrictedPython sandbox** - Blocks `eval`, `exec`, `compile`, `open`
3. **Import allowlist** - Only allows safe modules like `json`, `pandas`, `numpy`
4. **`__builtins__` replacement** - Removes dangerous builtins
5. **Path validation** - Blocks `..` traversal and symlink escapes

The Data Owner's secrets (`TOP_SECRET_PASSWORD_12345`) should remain protected!