Skip to content

Commit

Permalink
feat: initial support for protocol based serving in netius
Browse files Browse the repository at this point in the history
  • Loading branch information
joamag committed Aug 23, 2021
1 parent 76bf922 commit c942189
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 7 deletions.
46 changes: 43 additions & 3 deletions src/netius/base/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ def __init__(self, name = None, handlers = None, *args, **kwargs):
self._forked = False
self._child = False
self._concrete = False
self._services = {}
self._childs = []
self._events = {}
self._notified = []
Expand Down Expand Up @@ -2060,7 +2061,11 @@ def reads(self, reads, state = True):

# iterates over all of the read events and calls the proper on
# read method handler to properly handle each event
for read in reads: self.on_read(read)
for read in reads:
if read in self._services:
service = self._services[read]
self.on_read_s(read, service)
self.on_read(read)

def writes(self, writes, state = True):
# in case the update state is requested updates the current loop
Expand Down Expand Up @@ -2192,7 +2197,8 @@ def serve(
# them may print some specific debugging information
self.on_serve()

#@todo this is the dummy object representing the new server
# creates a new service instance that is going to represents the new server
# that is going to start accepting new connections
service = self.new_service(_socket, ssl)

# delays the operation of calling the callback with the new (service) instance
Expand Down Expand Up @@ -2890,15 +2896,47 @@ def on_error(self, _socket):

connection.close()

def on_read_s(self, _socket, service):
try:
while True:
socket_c, address = _socket.accept()
try: service.on_socket_c(socket_c, address)
except Exception: socket_c.close(); raise
except ssl.SSLError as error:
error_v = error.args[0] if error.args else None
error_m = error.reason if hasattr(error, "reason") else None
if error_v in SSL_SILENT_ERRORS:
self.on_expected_s(error)
elif not error_v in SSL_VALID_ERRORS and\
not error_m in SSL_VALID_REASONS:
self.on_exception_s(error)
except socket.error as error:
error_v = error.args[0] if error.args else None
if error_v in SILENT_ERRORS:
self.on_expected_s(error)
elif not error_v in VALID_ERRORS:
self.on_exception_s(error)
except (KeyboardInterrupt, SystemExit):
raise
except BaseException as exception:
self.on_exception_s(exception)

def on_exception(self, exception, connection):
self.warning(exception)
self.log_stack()
connection.close()

def on_exception_s(self, exception):
self.warning(exception)
self.log_stack()

def on_expected(self, exception, connection):
self.debug(exception)
connection.close()

def on_expected_s(self, exception):
self.debug(exception)

def on_connect(self, connection):
connection.set_connected()
if not hasattr(connection, "tuple"): return
Expand Down Expand Up @@ -3040,7 +3078,9 @@ def del_connection(self, connection):
return self.on_connection_d(connection)

def new_service(self, socket, ssl = False):
return Service(owner = self, socket = socket, ssl = ssl)
service = Service(owner = self, socket = socket, ssl = ssl)
self._services[socket] = service
return service

def add_callback(self, socket, callback):
callbacks = self.callbacks_m.get(socket, [])
Expand Down
34 changes: 34 additions & 0 deletions src/netius/base/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,27 @@ def on_error(connection):
self._loop.delay(lambda: on_complete(connection, True))
yield future

def _start_serving(
self,
protocol_factory,
sock,
sslcontext = None,
server = None,
backlog = 100,
ssl_handshake_timeout = None
):
# @todo this is pending proper Netius implementation
self._add_reader(
sock.fileno(),
self._accept_connection,
protocol_factory,
sock,
sslcontext,
server,
backlog,
ssl_handshake_timeout
)

def _set_current_task(self, task):
"""
Updates the currently executing task in the global
Expand Down Expand Up @@ -498,6 +519,19 @@ def _build_datagram_native(
*args,
**kwargs
):
"""
Builds a datagram assuming that the current event
loop in execution is a Netius one and that the support
for the Netius specific methods exist.
This method is typically faster than using the compat
one which only makes use of the asyncio API.
The end goal of this method is to call the callback method
with a tuple containing both the transport and the protocol
for the requested datagram based "connection".
"""

from . import common

loop = loop or common.get_loop()
Expand Down
94 changes: 93 additions & 1 deletion src/netius/base/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,21 @@
""" The license for the module """

import uuid
import socket

from . import observer
from . import transport

BUFFER_SIZE_S = None
""" The size of both the send and receive buffers for
the socket representing the server, this socket is
responsible for the handling of the new connections """

BUFFER_SIZE_C = None
""" The size of the buffers (send and receive) that
is going to be set on the on the sockets created by
the server (client sockets), this is critical for a
good performance of the server (large value) """

class Service(observer.Observable):
"""
Expand All @@ -56,11 +69,90 @@ class Service(observer.Observable):
def __init__(
self,
owner = None,
transport = None,
socket = None,
ssl = False
ssl = False,
receive_buffer_s = BUFFER_SIZE_S,
send_buffer_s = BUFFER_SIZE_S,
receive_buffer_c = BUFFER_SIZE_C,
send_buffer_c = BUFFER_SIZE_C
):
observer.Observable.__init__(self)
self.id = str(uuid.uuid4())
self.owner = owner
self.transport = transport
self.socket = socket
self.ssl = ssl
self.receive_buffer_s = receive_buffer_s
self.send_buffer_s = send_buffer_s
self.receive_buffer_c = receive_buffer_c
self.send_buffer_c = send_buffer_c

def on_socket_c(self, socket_c, address):
connection = self.build_connection(socket_c, address)
_transport = transport.TransportStream(self, connection)
self.trigger("connection", connection)

def build_connection(self, socket_c, address):
# verifies a series of pre-conditions on the socket so
# that it's ensured to be in a valid state before it's
# set as a new connection for the server (validation)
if self.ssl and not socket_c._sslobj: socket_c.close(); return

# in case the SSL mode is enabled, "patches" the socket
# object with an extra pending reference, that is going
# to be to store pending callable operations in it
if self.ssl: socket_c.pending = None

# verifies if the socket is of type internet (either ipv4
# of ipv6), this is going to be used for conditional setting
# of some of the socket options
is_inet = socket_c.family in (socket.AF_INET, socket.AF_INET6)

# sets the socket as non blocking and then updated a series
# of options in it, some of them taking into account if the
# socket if of type internet (timeout values)
socket_c.setblocking(0)
socket_c.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if is_inet: socket_c.setsockopt(
socket.IPPROTO_TCP,
socket.TCP_NODELAY,
1
)
if self.receive_buffer_c: socket_c.setsockopt(
socket.SOL_SOCKET,
socket.SO_RCVBUF,
self.receive_buffer_c
)
if self.send_buffer_c: socket_c.setsockopt(
socket.SOL_SOCKET,
socket.SO_SNDBUF,
self.send_buffer_c
)

# the process creation is considered completed and a new
# connection is created for it and opened, from this time
# on a new connection is considered accepted/created for server
connection = self.owner.build_connection(socket_c, address, ssl = self.ssl)
connection.open()

# registers the SSL handshake method as a starter method
# for the connection, so that the handshake is properly
# performed on the initial stage of the connection (as expected)
if self.ssl: connection.add_starter(self._ssl_handshake)

# runs the initial try for the handshaking process, note that
# this is an async process and further tries to the handshake
# may come after this one (async operation) in case an exception
# is raises the connection is closed (avoids possible errors)
try: connection.run_starter()
except Exception: connection.close(); raise

# in case there's extraneous data pending to be read from the
# current connection's internal receive buffer it must be properly
# handled on the risk of blocking the newly created connection
if connection.is_pending_data(): self.on_read(connection.socket)

# returns the connection that has been build already properly
# initialized and ready to be used
return connection
15 changes: 12 additions & 3 deletions src/netius/base/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,13 +368,22 @@ def serve_forever(self):
return asynchronous.coroutine_return(coroutine)

def is_serving(self):
#@todo must implement proper logic
return True

def _set_compat(self, protocol):
#@todo tenho de me registear para os eventos de nova conexao
# etc para poder encapsular e fazer connection_made
self.sockets = [self._service.socket]
self._set_binds()
self._set_protocol(protocol)

def _set_binds(self):
self._service.bind("connection", self._on_connection)

def _set_protocol(self, protocol, mark = True):
self._protocol = protocol

def _on_connection(self, connection):
transport = TransportStream(self._loop, connection)
transport._set_compat(self._protocol)

def _start_serving(self):
# in case the current context is already serving
Expand Down

0 comments on commit c942189

Please sign in to comment.