Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 204 additions & 0 deletions exploit_patch.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
cat > security_patch.py << 'EOF'
#!/usr/bin/env python3
"""
SECURITY PATCH for PyBitMessage - Critical RCE Vulnerability Fix
Author: Security Researcher
Date: $(date +%Y-%m-%d)

VULNERABILITY DESCRIPTION:
The file pybitmessage/network/knownnodes.py contained a critical Remote Code
Execution (RCE) vulnerability through unsafe pickle deserialization.

The vulnerable code was:
knownNodes = pickle.load(source) # nosec B301

WHY THIS IS CRITICAL:
- pickle.load() can execute arbitrary Python code during deserialization
- Any maliciously crafted knownnodes.dat file could compromise the system
- This is the same type of vulnerability that was exploited in PyBitMessage 0.6.2
- The # nosec comment indicated awareness but no actual fix was implemented

This patch completely removes pickle functionality and provides safe fallbacks.
"""

import os
import re
import sys

def apply_security_patch():
"""Apply security patch to remove unsafe pickle deserialization"""

file_path = 'pybitmessage/network/knownnodes.py'

if not os.path.exists(file_path):
print(f"❌ Error: {file_path} not found!")
return False

# Create backup
backup_path = file_path + '.backup'
try:
with open(file_path, 'r') as f:
original_content = f.read()
with open(backup_path, 'w') as f:
f.write(original_content)
print(f"✅ Backup created: {backup_path}")
except Exception as e:
print(f"❌ Failed to create backup: {e}")
return False

# Remove pickle import
new_content = re.sub(
r'^import pickle.*# nosec B403\s*$',
'# import pickle # REMOVED FOR SECURITY - UNSAFE DESERIALIZATION',
original_content,
flags=re.MULTILINE
)

# Replace pickle deserialization function with safe version
old_pickle_function = '''def pickle_deserialize_old_knownnodes(source):
"""
Unpickle source and reorganize knownnodes dict if it has old format
the old format was {Peer:lastseen, ...}
the new format is {Peer:{"lastseen":i, "rating":f}}
"""
global knownNodes
logger.debug("DEBUG: Starting pickle_deserialize_old_knownnodes")
knownNodes = pickle.load(source) # nosec B301
logger.debug("DEBUG: Loaded old format knownNodes with %d streams", len(knownNodes))
for stream in knownNodes.keys():
logger.debug("DEBUG: Processing stream %s with %d nodes", stream, len(knownNodes[stream]))
for node, params in six.iteritems(knownNodes[stream]):
if isinstance(params, (float, int)):
logger.debug("DEBUG: Converting old format node %s", node.host)
addKnownNode(stream, node, params)
logger.debug("DEBUG: Completed pickle_deserialize_old_knownnodes")'''

new_pickle_function = '''def pickle_deserialize_old_knownnodes(source):
"""
DISABLED FOR SECURITY - UNSAFE PICKLE DESERIALIZATION REMOVED

Original purpose: Unpickle source and reorganize knownnodes dict if it has old format
Security risk: pickle.load() allows arbitrary code execution via malicious knownnodes.dat

This function now safely handles the case without RCE vulnerability.
"""
global knownNodes
logger.error("🔒 SECURITY: Unsafe pickle deserialization was attempted but has been disabled")
logger.error("🔒 This prevents Remote Code Execution via malicious knownnodes.dat files")
logger.error("🔒 Please delete knownnodes.dat and restart PyBitMessage to regenerate safe version")

# Safe fallback: use empty knownNodes instead of risky deserialization
knownNodes = {}

# Security measure: automatically remove potentially malicious file
try:
import os
from pybitmessage import state
knownnodes_path = os.path.join(state.appdata, 'knownnodes.dat')
if os.path.exists(knownnodes_path):
os.remove(knownnodes_path)
logger.info("🔒 Security: Removed potentially malicious knownnodes.dat")
except Exception as e:
logger.error("Could not remove knownnodes.dat: %s", e)'''

if old_pickle_function in new_content:
new_content = new_content.replace(old_pickle_function, new_pickle_function)
else:
print("⚠️ Could not find exact pickle function pattern, applying import removal only")

# Update readKnownNodes to handle the change
new_content = re.sub(
r'except ValueError:\s*logger\.debug\("DEBUG: JSON failed, trying pickle deserialization"\)\s*source\.seek\(0\)\s*pickle_deserialize_old_knownnodes\(source\)',
'except ValueError:\n logger.debug("DEBUG: JSON failed - pickle disabled for security")\n createDefaultKnownNodes()',
new_content
)

# Write patched file
try:
with open(file_path, 'w') as f:
f.write(new_content)
print("✅ Security patch applied successfully!")
print("🔒 Removed: import pickle (RCE vulnerability)")
print("🔒 Fixed: pickle_deserialize_old_knownnodes() function")
print("🔒 Updated: readKnownNodes() to use safe fallback")

# Verify patch
with open(file_path, 'r') as f:
patched_content = f.read()
if 'import pickle' in patched_content and 'import pickle # REMOVED' not in patched_content:
print("⚠️ Warning: pickle import might still be present")
else:
print("✅ Verified: pickle import properly removed")

return True

except Exception as e:
print(f"❌ Failed to write patched file: {e}")
# Restore backup
try:
with open(backup_path, 'r') as f:
backup_content = f.read()
with open(file_path, 'w') as f:
f.write(backup_content)
print("✅ Original file restored from backup")
except:
print("❌ CRITICAL: Could not restore backup!")
return False

def verify_vulnerability_fixed():
"""Verify that the RCE vulnerability has been patched"""
print("\n" + "="*60)
print("SECURITY VERIFICATION")
print("="*60)

file_path = 'pybitmessage/network/knownnodes.py'

try:
with open(file_path, 'r') as f:
content = f.read()

checks = [
('import pickle' not in content or 'import pickle # REMOVED' in content,
"Pickle import removed or commented"),
('pickle.load(' not in content, "No unsafe pickle.load() calls"),
('def pickle_deserialize_old_knownnodes' in content, "Function still exists for compatibility"),
('logger.error("🔒 SECURITY:"' in content, "Security warnings added")
]

all_passed = True
for check, description in checks:
if check:
print(f"✅ PASS: {description}")
else:
print(f"❌ FAIL: {description}")
all_passed = False

if all_passed:
print("\n🎉 ALL SECURITY CHECKS PASSED! RCE VULNERABILITY FIXED!")
else:
print("\n⚠️ Some security checks failed - please review manually")

return all_passed

except Exception as e:
print(f"❌ Verification failed: {e}")
return False

if __name__ == "__main__":
print("PyBitMessage Security Patch - RCE Vulnerability Fix")
print("=" * 50)

if apply_security_patch():
verify_vulnerability_fixed()
print("\n📝 NEXT STEPS:")
print("1. Delete any existing knownnodes.dat files")
print("2. Restart PyBitMessage to generate new safe knownnodes.dat")
print("3. Share this patch with the PyBitMessage community")
print("4. Consider updating tests in tests/core.py to remove pickle usage")
else:
print("\n❌ Patch failed - please apply changes manually")
sys.exit(1)
EOF

echo "✅ Security patch script created: security_patch.py"
echo "🚀 Run it with: python3 security_patch.py"