Skip to content
Permalink
Browse files

Add support to set additional socket options

from circuits.net.sockets import TCPServer
from socket import SOL_SOCKET, SO_REUSEPORT
s = TCPServer(('0.0.0.0', 8090), socket_options=[(SOL_SOCKET, SO_REUSEPORT, 1)])
assert s._sock.getsockopt(SOL_SOCKET, SO_REUSEPORT) == 1
  • Loading branch information...
spaceone committed Jan 24, 2017
1 parent 25442fc commit e92d26b1f4e534e1b004531e61bdea6d00d0bbd8
Showing with 62 additions and 47 deletions.
  1. +55 −47 circuits/net/sockets.py
  2. +7 −0 tests/net/test_socket_options.py
@@ -10,8 +10,8 @@
EMFILE, ENFILE, ENOBUFS, ENOMEM, ENOTCONN, EPERM, EPIPE, EWOULDBLOCK,
)
from socket import (
AF_INET, AF_INET6, IPPROTO_TCP, SO_BROADCAST, SO_REUSEADDR, SOCK_DGRAM,
SOCK_STREAM, SOL_SOCKET, TCP_NODELAY, error as SocketError, gaierror,
AF_INET, AF_INET6, AF_UNIX, IPPROTO_IP, IPPROTO_TCP, SO_BROADCAST, SO_REUSEADDR,
SOCK_DGRAM, SOCK_STREAM, SOL_SOCKET, TCP_NODELAY, error as SocketError, gaierror,
getaddrinfo, getfqdn, gethostbyname, gethostname, socket,
)
from time import time
@@ -78,6 +78,11 @@ class Client(BaseComponent):

channel = "client"

socket_family = AF_INET
socket_type = SOCK_STREAM
socket_protocol = IPPROTO_IP
socket_options = []

def __init__(self, bind=None, bufsize=BUFSIZE, channel=channel, **kwargs):
super(Client, self).__init__(channel=channel, **kwargs)

@@ -233,24 +238,29 @@ def __on_write(self, sock):
elif self._poller.isWriting(self._sock):
self._poller.removeWriter(self._sock)

def _create_socket(self):
sock = socket(self.socket_family, self.socket_type, self.socket_protocol)

for option in self.socket_options:
sock.setsockopt(*option)
sock.setblocking(False)
if self._bind is not None:
sock.bind(self._bind)
return sock


class TCPClient(Client):

socket_family = AF_INET
socket_type = SOCK_STREAM
socket_protocol = IPPROTO_TCP
socket_options = [
(IPPROTO_TCP, TCP_NODELAY, 1),
]

def init(self, connect_timeout=5, *args, **kwargs):
self.connect_timeout = connect_timeout

def _create_socket(self):
sock = socket(self.socket_family, SOCK_STREAM, IPPROTO_TCP)
if self._bind is not None:
sock.bind(self._bind)

sock.setblocking(False)
sock.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)

return sock

@handler("connect") # noqa
def connect(self, host, port, secure=False, **kwargs):
# XXX: C901: This has a high McCacbe complexity score of 10.
@@ -322,16 +332,9 @@ def parse_bind_parameter(self, bind_parameter):

class UNIXClient(Client):

def _create_socket(self):
from socket import AF_UNIX

sock = socket(AF_UNIX, SOCK_STREAM)
if self._bind is not None:
sock.bind(self._bind)

sock.setblocking(False)

return sock
socket_family = AF_UNIX
socket_type = SOCK_STREAM
socket_options = []

@handler("ready")
def ready(self, component):
@@ -387,11 +390,13 @@ def on_error(sock, err):
class Server(BaseComponent):

channel = "server"
socket_protocol = IPPROTO_IP

def __init__(self, bind, secure=False, backlog=BACKLOG,
bufsize=BUFSIZE, channel=channel, **kwargs):
super(Server, self).__init__(channel=channel)

self.socket_options = self.socket_options[:] + kwargs.get('socket_options', [])
self._bind = self.parse_bind_parameter(bind)

self._backlog = backlog
@@ -647,18 +652,28 @@ def _on_write(self, sock):
elif self._poller.isWriting(sock):
self._poller.removeWriter(sock)

def _create_socket(self):
sock = socket(self.socket_family, self.socket_type, self.socket_protocol)

for option in self.socket_options:
sock.setsockopt(*option)
sock.setblocking(False)
if self._bind is not None:
sock.bind(self._bind)
return sock


class TCPServer(Server):

socket_family = AF_INET
socket_type = SOCK_STREAM
socket_options = [
(SOL_SOCKET, SO_REUSEADDR, 1),
(IPPROTO_TCP, TCP_NODELAY, 1),
]

def _create_socket(self):
sock = socket(self.socket_family, SOCK_STREAM)

sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
sock.setblocking(False)
sock.bind(self._bind)
sock = super(TCPServer, self)._create_socket()
sock.listen(self._backlog)

return sock
@@ -706,17 +721,17 @@ def parse_bind_parameter(self, bind_parameter):

class UNIXServer(Server):

def _create_socket(self):
from socket import AF_UNIX
socket_family = AF_UNIX
socket_type = SOCK_STREAM
socket_options = [
(SOL_SOCKET, SO_REUSEADDR, 1),
]

def _create_socket(self):
if os.path.exists(self._bind):
os.unlink(self._bind)

sock = socket(AF_UNIX, SOCK_STREAM)
sock.bind(self._bind)

sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.setblocking(False)
sock = super(UNIXServer, self)._create_socket()
sock.listen(self._backlog)

return sock
@@ -725,18 +740,11 @@ def _create_socket(self):
class UDPServer(Server):

socket_family = AF_INET

def _create_socket(self):
sock = socket(self.socket_family, SOCK_DGRAM)

sock.bind(self._bind)

sock.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)

sock.setblocking(False)

return sock
socket_type = SOCK_DGRAM
socket_options = [
(SOL_SOCKET, SO_BROADCAST, 1),
(SOL_SOCKET, SO_REUSEADDR, 1)
]

def _close(self, sock):
self._poller.discard(sock)
@@ -0,0 +1,7 @@
from circuits.net.sockets import TCPServer
from socket import SOL_SOCKET, SO_REUSEPORT


def test_socket_options_server():
s = TCPServer(('0.0.0.0', 8090), socket_options=[(SOL_SOCKET, SO_REUSEPORT, 1)])
assert s._sock.getsockopt(SOL_SOCKET, SO_REUSEPORT) == 1

0 comments on commit e92d26b

Please sign in to comment.
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.