Skip to content

Commit

Permalink
Merge c22a5a4 into 5bbbdc6
Browse files Browse the repository at this point in the history
  • Loading branch information
joamag committed Feb 6, 2019
2 parents 5bbbdc6 + c22a5a4 commit 3c1e568
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 45 deletions.
21 changes: 19 additions & 2 deletions src/netius/base/common.py
Expand Up @@ -2042,7 +2042,7 @@ def datagram(
# creates a new connection object representing the datagram socket
# that has just been created to be used for upper level operations
# and then immediately sets it as connected
connection = self.new_connection(_socket, datagram = True)
connection = self.new_connection_base(_socket, datagram = True)
connection.open()
connection.set_connected()

Expand Down Expand Up @@ -2157,7 +2157,7 @@ def connect(
# creates the connection object using the typical constructor
# and then sets the SSL host (for verification) if the verify
# SSL option is defined (secured and verified connection)
connection = self.new_connection(_socket, address, ssl = ssl)
connection = self.new_connection_base(_socket, address, ssl = ssl)
if ssl_verify: connection.ssl_host = host

# schedules the underlying non blocking connect operation to
Expand Down Expand Up @@ -2683,6 +2683,23 @@ def new_connection(
ssl = ssl
)

def new_connection_base(
self,
socket,
address = None,
datagram = False,
ssl = False
):
connection = Connection(
owner = self,
socket = socket,
address = address,
datagram = datagram,
ssl = ssl
)
connection._base = True
return connection

def add_callback(self, socket, callback):
callbacks = self.callbacks_m.get(socket, [])
if callback in callbacks: return
Expand Down
2 changes: 1 addition & 1 deletion src/netius/base/container.py
Expand Up @@ -39,7 +39,7 @@

from . import server

from .common import * #@UnusedWildImport
from .common import * #@UnusedWildImport pylint: disable=W0614

class Container(Base):

Expand Down
2 changes: 1 addition & 1 deletion src/netius/base/errors.py
Expand Up @@ -50,7 +50,7 @@ class NetiusError(BaseException):

def __init__(self, *args, **kwargs):
BaseException.__init__(self, *args)
message = args[0] if len(args) > 0 else ""
message = args[0] if args else ""
kwargs["message"] = kwargs.get("message", message)
self.kwargs = kwargs
self._uid = None
Expand Down
74 changes: 37 additions & 37 deletions src/netius/base/poll.py
Expand Up @@ -88,7 +88,7 @@ def close(self):
self.error_o.clear()

def poll(self):
return []
return ([], [], [])

def poll_owner(self):
reads, writes, errors = self.poll()
Expand Down Expand Up @@ -188,7 +188,7 @@ def open(self, timeout = POLL_TIMEOUT):
self._open = True
self.timeout = timeout

self.epoll = select.epoll() #@UndefinedVariable
self.epoll = select.epoll() #@UndefinedVariable pylint: disable=E1101

self.fd_m = {}

Expand All @@ -215,13 +215,13 @@ def poll(self):

events = self.epoll.poll(self.timeout)
for fd, event in events:
if event & select.EPOLLIN: #@UndefinedVariable
if event & select.EPOLLIN: #@UndefinedVariable pylint: disable=E1101
socket = self.fd_m.get(fd, None)
socket and result[0].append(socket)
if event & select.EPOLLOUT: #@UndefinedVariable
if event & select.EPOLLOUT: #@UndefinedVariable pylint: disable=E1101
socket = self.fd_m.get(fd, None)
socket and result[1].append(socket)
if event & select.EPOLLERR or event & select.EPOLLHUP: #@UndefinedVariable
if event & select.EPOLLERR or event & select.EPOLLHUP: #@UndefinedVariable pylint: disable=E1101
socket = self.fd_m.get(fd, None)
socket and result[2].append(socket)

Expand All @@ -237,9 +237,9 @@ def sub_read(self, socket, owner = None):
self.read_o[socket] = owner
self.write_o[socket] = owner
self.error_o[socket] = owner
self.epoll.register( #@UndefinedVariable
self.epoll.register( #@UndefinedVariable pylint: disable=E1101
socket_fd,
select.EPOLLIN | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP | select.EPOLLET #@UndefinedVariable
select.EPOLLIN | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP | select.EPOLLET #@UndefinedVariable pylint: disable=E1101
)

def sub_write(self, socket, owner = None):
Expand All @@ -251,7 +251,7 @@ def sub_error(self, socket, owner = None):
def unsub_read(self, socket):
if not socket in self.read_o: return
socket_fd = socket.fileno()
self.epoll.unregister( #@UndefinedVariable
self.epoll.unregister( #@UndefinedVariable pylint: disable=E1101
socket_fd
)
del self.fd_m[socket_fd]
Expand Down Expand Up @@ -281,7 +281,7 @@ def open(self, timeout = POLL_TIMEOUT):
self.timeout = timeout
if self.timeout < 0: self.timeout = None

self.kqueue = select.kqueue() #@UndefinedVariable
self.kqueue = select.kqueue() #@UndefinedVariable pylint: disable=E1101

self.fd_m = {}

Expand All @@ -307,16 +307,16 @@ def poll(self):

events = self.kqueue.control(None, 32, self.timeout)
for event in events:
if event.flags & select.KQ_EV_ERROR: #@UndefinedVariable
if event.flags & select.KQ_EV_ERROR: #@UndefinedVariable pylint: disable=E1101
socket = self.fd_m.get(event.udata, None)
socket and result[2].append(socket)
elif event.filter == select.KQ_FILTER_READ: #@UndefinedVariable
elif event.filter == select.KQ_FILTER_READ: #@UndefinedVariable pylint: disable=E1101
socket = self.fd_m.get(event.udata, None)
index = 2 if event.flags & select.KQ_EV_EOF else 0 #@UndefinedVariable
index = 2 if event.flags & select.KQ_EV_EOF else 0 #@UndefinedVariable pylint: disable=E1101
socket and result[index].append(socket)
elif event.filter == select.KQ_FILTER_WRITE: #@UndefinedVariable
elif event.filter == select.KQ_FILTER_WRITE: #@UndefinedVariable pylint: disable=E1101
socket = self.fd_m.get(event.udata, None)
index = 2 if event.flags & select.KQ_EV_EOF else 1 #@UndefinedVariable
index = 2 if event.flags & select.KQ_EV_EOF else 1 #@UndefinedVariable pylint: disable=E1101
socket and result[index].append(socket)

return result
Expand All @@ -331,17 +331,17 @@ def sub_read(self, socket, owner = None):
self.read_o[socket] = owner
self.write_o[socket] = owner
self.error_o[socket] = owner
event = select.kevent( #@UndefinedVariable
event = select.kevent( #@UndefinedVariable pylint: disable=E1101
socket_fd,
filter = select.KQ_FILTER_READ, #@UndefinedVariable
flags = select.KQ_EV_ADD | select.KQ_EV_CLEAR, #@UndefinedVariable
filter = select.KQ_FILTER_READ, #@UndefinedVariable pylint: disable=E1101
flags = select.KQ_EV_ADD | select.KQ_EV_CLEAR, #@UndefinedVariable pylint: disable=E1101
udata = socket_fd
)
self.kqueue.control([event], 0)
event = select.kevent( #@UndefinedVariable
event = select.kevent( #@UndefinedVariable pylint: disable=E1101
socket_fd,
filter = select.KQ_FILTER_WRITE, #@UndefinedVariable
flags = select.KQ_EV_ADD | select.KQ_EV_CLEAR, #@UndefinedVariable
filter = select.KQ_FILTER_WRITE, #@UndefinedVariable pylint: disable=E1101
flags = select.KQ_EV_ADD | select.KQ_EV_CLEAR, #@UndefinedVariable pylint: disable=E1101
udata = socket_fd
)
self.kqueue.control([event], 0)
Expand All @@ -355,16 +355,16 @@ def sub_error(self, socket, owner = None):
def unsub_read(self, socket):
if not socket in self.read_o: return
socket_fd = socket.fileno()
event = select.kevent( #@UndefinedVariable
event = select.kevent( #@UndefinedVariable pylint: disable=E1101
socket_fd,
filter = select.KQ_FILTER_READ, #@UndefinedVariable
flags = select.KQ_EV_DELETE #@UndefinedVariable
filter = select.KQ_FILTER_READ, #@UndefinedVariable pylint: disable=E1101
flags = select.KQ_EV_DELETE #@UndefinedVariable pylint: disable=E1101
)
self.kqueue.control([event], 0)
event = select.kevent( #@UndefinedVariable
event = select.kevent( #@UndefinedVariable pylint: disable=E1101
socket_fd,
filter = select.KQ_FILTER_WRITE, #@UndefinedVariable
flags = select.KQ_EV_DELETE #@UndefinedVariable
filter = select.KQ_FILTER_WRITE, #@UndefinedVariable pylint: disable=E1101
flags = select.KQ_EV_DELETE #@UndefinedVariable pylint: disable=E1101
)
self.kqueue.control([event], 0)
del self.fd_m[socket_fd]
Expand Down Expand Up @@ -393,7 +393,7 @@ def open(self, timeout = POLL_TIMEOUT):
self._open = True
self.timeout = timeout

self._poll = select.poll() #@UndefinedVariable
self._poll = select.poll() #@UndefinedVariable pylint: disable=E1101

self.read_fd = {}
self.write_fd = {}
Expand Down Expand Up @@ -421,13 +421,13 @@ def poll(self):

events = self._poll.poll(self.timeout * 1000)
for fd, event in events:
if event & select.POLLIN: #@UndefinedVariable
if event & select.POLLIN: #@UndefinedVariable pylint: disable=E1101
socket = self.read_fd.get(fd, None)
socket and result[0].append(socket)
if event & select.POLLOUT: #@UndefinedVariable
if event & select.POLLOUT: #@UndefinedVariable pylint: disable=E1101
socket = self.write_fd.get(fd, None)
socket and result[1].append(socket)
if event & select.POLLERR or event & select.POLLHUP: #@UndefinedVariable
if event & select.POLLERR or event & select.POLLHUP: #@UndefinedVariable pylint: disable=E1101
socket = self.read_fd.get(fd, None)
socket and result[2].append(socket)

Expand All @@ -441,19 +441,19 @@ def sub_read(self, socket, owner = None):
socket_fd = socket.fileno()
self.read_fd[socket_fd] = socket
self.read_o[socket] = owner
self._poll.register( #@UndefinedVariable
self._poll.register( #@UndefinedVariable pylint: disable=E1101
socket_fd,
select.POLLIN #@UndefinedVariable
select.POLLIN #@UndefinedVariable pylint: disable=E1101
)

def sub_write(self, socket, owner = None):
if socket in self.write_o: return
socket_fd = socket.fileno()
self.write_fd[socket_fd] = socket
self.write_o[socket] = owner
self._poll.modify( #@UndefinedVariable
self._poll.modify( #@UndefinedVariable pylint: disable=E1101
socket_fd,
select.POLLIN | select.POLLOUT #@UndefinedVariable
select.POLLIN | select.POLLOUT #@UndefinedVariable pylint: disable=E1101
)

def sub_error(self, socket, owner = None):
Expand All @@ -463,7 +463,7 @@ def sub_error(self, socket, owner = None):
def unsub_read(self, socket):
if not socket in self.read_o: return
socket_fd = socket.fileno()
self._poll.unregister( #@UndefinedVariable
self._poll.unregister( #@UndefinedVariable pylint: disable=E1101
socket_fd
)
del self.read_fd[socket_fd]
Expand All @@ -472,9 +472,9 @@ def unsub_read(self, socket):
def unsub_write(self, socket):
if not socket in self.write_o: return
socket_fd = socket.fileno()
self._poll.modify( #@UndefinedVariable
self._poll.modify( #@UndefinedVariable pylint: disable=E1101
socket_fd,
select.POLLIN #@UndefinedVariable
select.POLLIN #@UndefinedVariable pylint: disable=E1101
)
del self.write_fd[socket_fd]
del self.write_o[socket]
Expand Down
34 changes: 30 additions & 4 deletions src/netius/base/server.py
Expand Up @@ -733,7 +733,16 @@ def on_read(self, _socket):
if not connection.status == OPEN: return
if not connection.renable == True: return

#@todo this is a temporary hack to redirect stuff up
if hasattr(connection, "_base"): return Base.on_read(self, _socket)

try:
# in case the connection is under the connecting state
# the socket must be verified for errors and in case
# there's none the connection must proceed, for example
# the SSL connection handshake must be performed/retried
if connection.connecting: self._connectf(connection)

# verifies if there's any pending operations in the
# connection (eg: SSL handshaking) and performs it trying
# to finish them, if they are still pending at the current
Expand Down Expand Up @@ -776,6 +785,15 @@ def on_write(self, _socket):
if not connection: return
if not connection.status == OPEN: return

#@todo this is a temporary hack to redirect stuff up
if hasattr(connection, "_base"): return Base.on_write(self, _socket)

# in case the connection is under the connecting state
# the socket must be verified for errors and in case
# there's none the connection must proceed, for example
# the SSL connection handshake must be performed/retried
if connection.connecting: self._connectf(connection)

try:
connection._send()
except ssl.SSLError as error:
Expand Down Expand Up @@ -804,6 +822,9 @@ def on_error(self, _socket):
if not connection: return
if not connection.status == OPEN: return

#@todo this is a temporary hack to redirect stuff up
if hasattr(connection, "_base"): return Base.on_error(self, _socket)

connection.close()

def on_exception(self, exception, connection):
Expand Down Expand Up @@ -848,10 +869,12 @@ def on_ssl(self, connection):
if self.ssl_dump: connection.ssl_dump_certificate(self.ssl_dump)
else: connection.ssl_dump_certificate()

# in case the current connection is under the upgrade
# status calls the proper event handler so that the
# connection workflow may proceed accordingly
if connection.upgrading: self.on_upgrade(connection)
# verifies if the connection is either connecting or upgrading
# and calls the proper event handler for each event, this is
# required because the connection workflow is probably dependent
# on the calling of these event handlers to proceed
if connection.connecting: self.on_connect(connection)
elif connection.upgrading: self.on_upgrade(connection)

def on_data(self, connection, data):
connection.set_data(data)
Expand Down Expand Up @@ -932,6 +955,9 @@ def on_socket_d(self, socket_c):
def _ssl_handshake(self, connection):
Server._ssl_handshake(self, connection)

#@todo this is a temporary hack to redirect stuff up
if hasattr(connection, "_base"): return Base._ssl_handshake(self, connection)

# verifies if the socket still has finished the SSL handshaking
# process (by verifying the appropriate flag) and then if that's
# not the case returns immediately (nothing done)
Expand Down
3 changes: 3 additions & 0 deletions src/netius/servers/http.py
Expand Up @@ -743,6 +743,9 @@ def info_dict(self, full = False):
return info

def on_data(self, connection, data):
#@todo this is a temporary hack to redirect stuff up
if hasattr(connection, "_base"): return netius.Base.on_data(self, connection, data)

netius.StreamServer.on_data(self, connection, data)
connection.parse(data)

Expand Down

0 comments on commit 3c1e568

Please sign in to comment.