Skip to content

Commit

Permalink
Merge 1f89fdf into 3e949d5
Browse files Browse the repository at this point in the history
  • Loading branch information
nakhan98 committed Jun 27, 2018
2 parents 3e949d5 + 1f89fdf commit 29586ce
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 1 deletion.
28 changes: 27 additions & 1 deletion radius.py
Expand Up @@ -197,6 +197,11 @@
# -------------------------------


# Valid protocols
UDP = "udp"
TCP = "tcp"


class Error(Exception):
"""
Base Error class.
Expand Down Expand Up @@ -477,18 +482,24 @@ def verify(self, data):
return Message.unpack(self.secret, data)


class RadiusException(Exception):
pass


class Radius(object):
"""
Radius client implementation.
"""

def __init__(self, secret, host='radius', port=DEFAULT_PORT,
retries=DEFAULT_RETRIES, timeout=DEFAULT_TIMEOUT):
retries=DEFAULT_RETRIES, timeout=DEFAULT_TIMEOUT,
proto=UDP):
self._secret = bytes_safe(secret)
self.retries = retries
self.timeout = timeout
self._host = host
self._port = port
self._proto = proto

@property
def host(self):
Expand All @@ -504,11 +515,26 @@ def secret(self):

@contextmanager
def connect(self):
if self._proto == UDP:
return self._connect_udp()
elif self._proto == TCP:
return self._connect_tcp()
else:
raise RadiusException("Invalid protocol specified: %s"
% self._proto)

def _connect_udp(self):
with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as c:
c.connect((self.host, self.port))
LOGGER.debug('Connected to %s:%s', self.host, self.port)
yield c

def _connect_tcp(self):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as c:
c.connect((self.host, self.port))
LOGGER.debug('Connected to %s:%s', self.host, self.port)
yield c

def send_message(self, message):
send = message.pack()

Expand Down
64 changes: 64 additions & 0 deletions tests.py
Expand Up @@ -159,8 +159,36 @@ def setUp(self):
self.sock.bind(('127.0.0.1', 0))
self.port = self.sock.getsockname()[1]

# Setup a test tcp server
self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.tcp_sock.bind(('127.0.0.1', 0))
self.tcp_port = self.tcp_sock.getsockname()[1]
self.tcp_sock.listen(1)
self.tcp_received_data = None

def tearDown(self):
self.sock.close()
self.tcp_sock.close()

@staticmethod
def assertEventually(func, fail_condition=False, timeout=3, interval=0.1,
msg=None, *args, **kwargs):
start_time = time.time()
stop_time = start_time + timeout

current_time = start_time
while (current_time < stop_time):
try:
assert func(*args, **kwargs) != fail_condition
except AssertionError:
pass
else:
return True

time.sleep(interval)
current_time = time.time()

raise AssertionError("Timed out - %s" % msg)

def startServer(self, target):
t = threading.Thread(target=target)
Expand All @@ -176,6 +204,26 @@ def test_connect(self):

self.assertEqual(b'hello?', self.sock.recv(32))

def test_connect_tcp(self):
"""Test connecting via tcp."""
def _reply_to_client():
conn, addr = self.tcp_sock.accept()
self.tcp_received_data = conn.recv(32)
conn.send(b'bye')

self.startServer(_reply_to_client)

r = radius.Radius(TEST_SECRET, host='localhost', port=self.tcp_port, proto=radius.TCP)
with r.connect() as c:
c.send(b'hello?')

self.assertEventually(lambda: b'hello?' == self.tcp_received_data)

def test_invalid_proto(self):
"""Test that exception is raised if specifying an invalid protocol"""
r = radius.Radius(TEST_SECRET, host='localhost', port=self.tcp_port, proto="invalid_proto")
self.assertRaises(radius.RadiusException, r.connect)

def test_failure(self):
"""Test sending a message and receiving a reject reply."""
def _reply_to_client():
Expand Down Expand Up @@ -204,6 +252,22 @@ def _reply_to_client():
r = radius.Radius(TEST_SECRET, host='localhost', port=self.port)
self.assertTrue(r.authenticate('username', 'password'))

def test_success_over_tcp(self):
"""Test sending a message and receiving an accept reply via tcp."""
def _reply_to_client():
"""Thread to act as server."""
conn, addr = self.tcp_sock.accept()
data = conn.recv(radius.PACKET_MAX)
m1 = radius.Message.unpack(TEST_SECRET, data)
m2 = create_reply(m1, radius.CODE_ACCESS_ACCEPT)
conn.send(m2.pack())
conn.close()

self.startServer(_reply_to_client)

r = radius.Radius(TEST_SECRET, host='localhost', port=self.tcp_port, proto=radius.TCP)
self.assertTrue(r.authenticate('username', 'password'))

def test_challenge(self):
"""Test sending a message and receiving an challenge reply."""
def _reply_to_client():
Expand Down

0 comments on commit 29586ce

Please sign in to comment.