Skip to content

Commit

Permalink
feat: allow enforcing of the used IP version
Browse files Browse the repository at this point in the history
Due to the need of whitelisting IP addresses, users might want to explicitly specify the used IP version
  • Loading branch information
d-Rickyy-b committed Jun 22, 2020
1 parent b810c44 commit 3483566
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 3 deletions.
8 changes: 6 additions & 2 deletions pastepwn/core/pastepwn.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@
from pastepwn.actions import DatabaseAction
from pastepwn.analyzers import AlwaysTrueAnalyzer
from pastepwn.core import ScrapingHandler, ActionHandler, PasteDispatcher
from pastepwn.util import Request
from pastepwn.util import Request, enforce_ip_version


class PastePwn(object):
"""Represents an instance of the pastepwn core module"""

def __init__(self, database=None, proxies=None, store_all_pastes=True):
def __init__(self, database=None, proxies=None, store_all_pastes=True, ip_version=None):
"""
Basic PastePwn object handling the connection to pastebin and all the analyzers and actions
:param database: Database object extending AbstractDB
:param proxies: Dict of proxies as defined in the requests documentation
:param store_all_pastes: Bool to decide if all pastes should be stored into the db
:param ip_version: The IP version pastepwn should use (4|6)
"""
self.logger = logging.getLogger(__name__)
self.is_idle = False
Expand All @@ -31,6 +32,9 @@ def __init__(self, database=None, proxies=None, store_all_pastes=True):
self.__exception_event = Event()
self.__request = Request(proxies) # initialize singleton

# We are trying to enforce a certain version of the Internet Protocol
enforce_ip_version(ip_version)

# Usage of ipify to get the IP - Uses the X-Forwarded-For Header which might
# lead to issues with proxies
try:
Expand Down
4 changes: 3 additions & 1 deletion pastepwn/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
from .request import Request
from .templatingengine import TemplatingEngine
from .threadingutils import start_thread, join_threads
from .network import enforce_ip_version

__all__ = ('Request',
'DictWrapper',
'start_thread',
'join_threads',
'TemplatingEngine',
'listify')
'listify',
'enforce_ip_version')
50 changes: 50 additions & 0 deletions pastepwn/util/network.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
import logging
import socket

import requests.packages.urllib3.util.connection as urllib3_cn


def _allowed_gai_family_4():
"""https://github.com/urllib3/urllib3/blob/master/src/urllib3/util/connection.py"""
return socket.AF_INET


def _allowed_gai_family_6():
"""https://github.com/urllib3/urllib3/blob/master/src/urllib3/util/connection.py"""
return socket.AF_INET6


def _allowed_gai_family_unspec():
"""https://github.com/urllib3/urllib3/blob/master/src/urllib3/util/connection.py"""
return socket.AF_UNSPEC


def enforce_ip_version(version=None):
"""
Tries to enforce either IPv4 or IPv6, depending on the version parameter
:param version: The IP version to use
:return:
"""
if version is None:
urllib3_cn.allowed_gai_family = _allowed_gai_family_unspec
return

version = str(version)

if version == "4":
logging.info("Enforcing IPv4!")
urllib3_cn.allowed_gai_family = _allowed_gai_family_4

elif version == "6":
logging.info("Enforcing IPv6!")
if not urllib3_cn.HAS_IPV6:
raise Exception("Your system can't handle IPv6!")

urllib3_cn.allowed_gai_family = _allowed_gai_family_6

elif version == "5":
raise ValueError("Internet Stream Protocol? Really? There is no actual IPv5!")

else:
raise ValueError("No valid value specified for 'version' parameter! Please use either 4 or 6")
46 changes: 46 additions & 0 deletions pastepwn/util/tests/network_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import unittest
from unittest.mock import Mock

from pastepwn.util import enforce_ip_version

import socket
import requests.packages.urllib3.util.connection as urllib3_cn


class TestNetwork(unittest.TestCase):
"""Tests for the network utils - these tests might fail on CI"""

def test_enforce_ip_version_4(self):
enforce_ip_version(4)
self.assertEqual(urllib3_cn.allowed_gai_family(), socket.AF_INET)
enforce_ip_version("4")
self.assertEqual(urllib3_cn.allowed_gai_family(), socket.AF_INET)

def test_enforce_ip_version_5(self):
with self.assertRaises(Exception):
enforce_ip_version(5)

with self.assertRaises(Exception):
enforce_ip_version("5")

def test_enforce_ip_version_6(self):
enforce_ip_version(6)
self.assertEqual(urllib3_cn.allowed_gai_family(), socket.AF_INET6)
enforce_ip_version("6")
self.assertEqual(urllib3_cn.allowed_gai_family(), socket.AF_INET6)

def test_enforce_ip_version_None(self):
enforce_ip_version(None)
self.assertEqual(urllib3_cn.allowed_gai_family(), socket.AF_UNSPEC)

enforce_ip_version()
self.assertEqual(urllib3_cn.allowed_gai_family(), socket.AF_UNSPEC)

def test_enforce_ip_version_obj(self):
mock = Mock()
with self.assertRaises(ValueError):
enforce_ip_version(mock)


if __name__ == '__main__':
unittest.main()

0 comments on commit 3483566

Please sign in to comment.