diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 00e92229..5bd651aa 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -5,7 +5,7 @@ jobs: name: Build strategy: matrix: - python-version: [2.7, 3.5, 3.6, 3.7, rc] + python-version: [2.7, 3.4, 3.5, 3.6, 3.7, 3.8, 3.9, rc] runs-on: ubuntu-latest container: python:${{ matrix.python-version }} steps: diff --git a/.gitignore b/.gitignore index 457c33ed..38e6e32e 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,8 @@ session.shelve* net.ca fs.data +/.vscode/settings.json + /dist /build /src/netius.egg-info diff --git a/examples/echo/echos_tcp.py b/examples/echo/echos_tcp.py new file mode 100644 index 00000000..ff544135 --- /dev/null +++ b/examples/echo/echos_tcp.py @@ -0,0 +1,84 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Hive Netius System +# Copyright (c) 2008-2018 Hive Solutions Lda. +# +# This file is part of Hive Netius System. +# +# Hive Netius System is free software: you can redistribute it and/or modify +# it under the terms of the Apache License as published by the Apache +# Foundation, either version 2.0 of the License, or (at your option) any +# later version. +# +# Hive Netius System is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Apache License for more details. +# +# You should have received a copy of the Apache License along with +# Hive Netius System. If not, see . + +__author__ = "João Magalhães " +""" The author(s) of the module """ + +__version__ = "1.0.0" +""" The version of the module """ + +__revision__ = "$LastChangedRevision$" +""" The revision number of the module """ + +__date__ = "$LastChangedDate$" +""" The last change date of the module """ + +__copyright__ = "Copyright (c) 2008-2018 Hive Solutions Lda." +""" The copyright for the module """ + +__license__ = "Apache License, Version 2.0" +""" The license for the module """ + +import netius + +import asyncio + +class EchoServerClientProtocol(asyncio.Protocol): + """ + Simple protocol implementation for an echo protocol + that writes back the received message through the + response pipeline. This implementation is inspired by + the Python asyncio documentation example. + + :see: https://docs.python.org/3.6/library/asyncio-protocol.html#protocol-examples + """ + + def connection_made(self, transport): + peername = transport.get_extra_info("peername") + print("Connection from %s" % str(peername)) + self.transport = transport + + def data_received(self, data): + message = data.decode() + print("Data received: %s" % message) + + print("Sending: %s" % message) + self.transport.write(data) + + print("Closing the client socket") + self.transport.close() + +loop = netius.get_loop(_compat = True) + +coro = loop.create_server( + lambda: EchoServerClientProtocol(), + "127.0.0.1", 8888 +) +server = loop.run_until_complete(coro) + +print("Serving on %s" % (server.sockets[0].getsockname(),)) + +try: loop.run_forever() +except KeyboardInterrupt: pass + +server.close() +loop.run_until_complete(server.wait_closed()) +loop.close() diff --git a/examples/echo/echos_udp.py b/examples/echo/echos_udp.py index 41ccefc7..026c235d 100644 --- a/examples/echo/echos_udp.py +++ b/examples/echo/echos_udp.py @@ -55,7 +55,7 @@ def datagram_received(self, data, addr): loop = netius.get_loop(_compat = True) listen = loop.create_datagram_endpoint( - EchoServerProtocol, + lambda: EchoServerProtocol(), local_addr = ("127.0.0.1", 9999) ) transport, protocol = loop.run_until_complete(listen) diff --git a/examples/echo/hello_http.py b/examples/echo/hello_http.py new file mode 100644 index 00000000..c29c6122 --- /dev/null +++ b/examples/echo/hello_http.py @@ -0,0 +1,78 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Hive Netius System +# Copyright (c) 2008-2018 Hive Solutions Lda. +# +# This file is part of Hive Netius System. +# +# Hive Netius System is free software: you can redistribute it and/or modify +# it under the terms of the Apache License as published by the Apache +# Foundation, either version 2.0 of the License, or (at your option) any +# later version. +# +# Hive Netius System is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Apache License for more details. +# +# You should have received a copy of the Apache License along with +# Hive Netius System. If not, see . + +__author__ = "João Magalhães " +""" The author(s) of the module """ + +__version__ = "1.0.0" +""" The version of the module """ + +__revision__ = "$LastChangedRevision$" +""" The revision number of the module """ + +__date__ = "$LastChangedDate$" +""" The last change date of the module """ + +__copyright__ = "Copyright (c) 2008-2018 Hive Solutions Lda." +""" The copyright for the module """ + +__license__ = "Apache License, Version 2.0" +""" The license for the module """ + +import netius + +import asyncio + +class HelloHTTPServerProtocol(asyncio.Protocol): + + def __init__(self, keep_alive = True): + super().__init__() + self.keep_alive = keep_alive + + def connection_made(self, transport): + self.transport = transport + self.peername = transport.get_extra_info("peername") + print("Connection from %s" % str(self.peername)) + + def connection_lost(self, exc): + print("Connection from %s lost (%s)" % (str(self.peername), str(exc))) + + def data_received(self, data): + keep_alive_s = b"keep-alive" if self.keep_alive else b"close" + self.transport.write(b"HTTP/1.1 200 OK\r\nConnection: %s\r\nContent-Type: text/plain\r\nContent-Length: 13\r\n\r\nHello, world!" % keep_alive_s) + if not self.keep_alive: self.transport.close() + +loop = netius.get_loop(_compat = True) + +coro = loop.create_server( + lambda: HelloHTTPServerProtocol(), + "127.0.0.1", 8888 +) +server = loop.run_until_complete(coro) + +print("Serving on %s" % (server.sockets[0].getsockname(),)) + +try: loop.run_forever() +except KeyboardInterrupt: pass + +server.close() +loop.run_until_complete(server.wait_closed()) +loop.close() diff --git a/examples/http/http_asyncio.py b/examples/http/http_asyncio.py index 7197125d..a85831cd 100644 --- a/examples/http/http_asyncio.py +++ b/examples/http/http_asyncio.py @@ -48,12 +48,12 @@ def print_http_headers(url, encoding = "utf-8"): url = urllib.parse.urlsplit(url) if url.scheme == "https": - connect = asyncio.open_connection(url.hostname, 443, ssl = True) + connect = asyncio.open_connection(url.hostname, url.port or 443, ssl = True) else: - connect = asyncio.open_connection(url.hostname, 80) + connect = asyncio.open_connection(url.hostname, url.port or 80) reader, writer = yield from connect - query = "HEAD {path} HTTP/1.0\r\n" + "Host: {hostname}\r\n" + "\r\n" + query = "HEAD {path} HTTP/1.1\r\nConnection: keep-alive\r\nHost: {hostname}\r\n\r\n" query = query.format(path = url.path or "/", hostname = url.hostname) writer.write(query.encode(encoding)) @@ -62,6 +62,7 @@ def print_http_headers(url, encoding = "utf-8"): if not line: break line = line.decode(encoding).rstrip() if line: print(line) + else: break writer.close() diff --git a/pylintrc b/pylintrc new file mode 100644 index 00000000..a761e1b0 --- /dev/null +++ b/pylintrc @@ -0,0 +1,2 @@ +[messages control] +disable=C0103,C0111,C0121,C0123,C0301,C0302,C0321,C0325,C0326,C0330,C0412,C0413,E1128,W0102,W0106,W0108,W0150,W0201,W0212,W0221,W0401,W0603,W0613,W0621,W0622,W0702,W1113 diff --git a/setup.py b/setup.py index cff0ca23..d6302d54 100644 --- a/setup.py +++ b/setup.py @@ -80,6 +80,7 @@ def read_file(path): "netius.mock", "netius.pool", "netius.servers", + "netius.servers.runners", "netius.sh", "netius.test" ], diff --git a/src/netius/base/__init__.py b/src/netius/base/__init__.py index a2291fc4..766c7533 100644 --- a/src/netius/base/__init__.py +++ b/src/netius/base/__init__.py @@ -50,6 +50,7 @@ from . import protocol from . import request from . import server +from . import service from . import stream from . import tls from . import transport @@ -66,8 +67,8 @@ SSL_DH_PATH, Base, BaseThread, new_loop_main, new_loop_asyncio, new_loop,\ ensure_main, ensure_asyncio, ensure_loop, get_main, get_loop, get_event_loop,\ stop_loop, compat_loop, get_poll, build_future, ensure, ensure_pool -from .compat import BaseLoop, CompatLoop, is_compat, is_asyncio, build_datagram,\ - connect_stream +from .compat import BaseLoop, CompatLoop, is_compat, is_asyncio, run, build_datagram,\ + connect_stream, serve_stream from .config import conf, conf_prefix, conf_suffix, conf_s, conf_r, conf_d, conf_ctx from .conn import OPEN, CLOSED, PENDING, CHUNK_SIZE, Connection from .container import Container, ContainerServer @@ -79,8 +80,9 @@ from .protocol import Protocol, DatagramProtocol, StreamProtocol from .request import Request, Response from .server import Server, DatagramServer, StreamServer +from .service import Service from .stream import Stream from .tls import fingerprint, match_fingerprint, match_hostname, dnsname_match,\ dump_certificate -from .transport import Transport, TransportDatagram, TransportStream +from .transport import Transport, TransportDatagram, TransportStream, ServerTransport from .util import camel_to_underscore, verify diff --git a/src/netius/base/async_neo.py b/src/netius/base/async_neo.py index a7dba586..0e6d59ee 100644 --- a/src/netius/base/async_neo.py +++ b/src/netius/base/async_neo.py @@ -258,14 +258,17 @@ def wait(*args, **kwargs): def coroutine_return(coroutine): """ Allows for the abstraction of the return value of a coroutine - object to be the result of the future yield as the first element + object to be the result of the future yielded as the last element of the generator. This allows the possibility to provide compatibility with the legacy not return allowed generators. + In case no value is yielded then an invalid value is returned as the + result of the async coroutine. + :type coroutine: CoroutineObject - :param coroutine: The coroutine object that is going to be yield back + :param coroutine: The coroutine object that is going to yield back and have its last future result returned from the generator. """ @@ -273,7 +276,22 @@ def coroutine_return(coroutine): return AwaitWrapper(generator) def _coroutine_return(coroutine): + # unsets the initial future reference value, this + # way it's possible to avoid future based return + # statement in case the coroutine is "empty" + future = None + + # iterates over the coroutine generator yield + # the values (should be future instances) and + # ignoring possible invalid values, the last + # value yielded is considered to be future + # that is going to be set as the return value for value in coroutine: + if value == None: continue yield value future = value - return future.result() + + # in case at least one valid future was yielded + # then set the result of such future as the result + # of the current coroutine (return statement) + return future.result() if future else None diff --git a/src/netius/base/async_old.py b/src/netius/base/async_old.py index 88a2ee4e..a5f48c1d 100644 --- a/src/netius/base/async_old.py +++ b/src/netius/base/async_old.py @@ -59,6 +59,8 @@ class Future(object): :see: https://en.wikipedia.org/wiki/Futures_and_promises """ + _asyncio_future_blocking = True + def __init__(self, loop = None): self.status = 0 self._loop = loop @@ -117,6 +119,18 @@ def add_ready_callback(self, function): def add_closed_callback(self, function): self.closed_callbacks.append(function) + def remove_done_callback(self, function): + self.done_callbacks.remove(function) + + def remove_partial_callback(self, function): + self.partial_callbacks.remove(function) + + def remove_ready_callback(self, function): + self.ready_callbacks.remove(function) + + def remove_closed_callback(self, function): + self.closed_callbacks.remove(function) + def approve(self, cleanup = True): self.set_result(None, cleanup = cleanup) @@ -336,15 +350,20 @@ def notify(event, data = None): def coroutine_return(coroutine): """ Allows for the abstraction of the return value of a coroutine - object to be the result of the future yield as the first element + object to be the result of the future yielded as the last element of the generator. This allows the possibility to provide compatibility with the legacy not return allowed generators. + In case no value is yielded then an invalid value is returned as the + result of the async coroutine. + :type coroutine: CoroutineObject - :param coroutine: The coroutine object that is going to be yield back + :param coroutine: The coroutine object that is going to yield back and have its last future result returned from the generator. """ - for value in coroutine: yield value + for value in coroutine: + if value == None: continue + yield value diff --git a/src/netius/base/asynchronous.py b/src/netius/base/asynchronous.py index e11590a1..2ceae7cd 100644 --- a/src/netius/base/asynchronous.py +++ b/src/netius/base/asynchronous.py @@ -40,10 +40,10 @@ # imports the base (old) version of the async implementation # that should be compatible with all the available python # interpreters, base collection of async library -from .async_old import * #@UnusedWildImport +from .async_old import * #@UnusedWildImport pylint: disable=W0614 # verifies if the current python interpreter version supports # the new version of the async implementation and if that's the # case runs the additional import of symbols, this should override # most of the symbols that have just been created -if is_neo(): from .async_neo import * #@UnusedWildImport +if is_neo(): from .async_neo import * #@UnusedWildImport pylint: disable=W0614 diff --git a/src/netius/base/client.py b/src/netius/base/client.py index 065f1c90..02a82ac7 100644 --- a/src/netius/base/client.py +++ b/src/netius/base/client.py @@ -317,9 +317,9 @@ def ensure_socket(self): # as nothing else remain to be done in the current method if self.socket: return - # prints a small debug message about the udp socket that is going + # prints a small debug message about the UDP socket that is going # to be created for the client's connection - self.debug("Creating clients's udp socket ...") + self.debug("Creating clients's UDP socket ...") # creates the socket that it's going to be used for the listening # of new connections (client socket) and sets it as non blocking @@ -689,13 +689,13 @@ def connect( # ensures that the proper socket family is defined in case the # requested host value is unix socket oriented, this step greatly # simplifies the process of created unix socket based clients - family = socket.AF_UNIX if host == "unix" else family + family = socket.AF_UNIX if host == "unix" else family #@UndefinedVariable pylint: disable=E1101 # verifies the kind of socket that is going to be used for the # connect operation that is going to be performed, note that the # unix type should be used with case as it does not exist in every # operative system and may raised an undefined exceptions - is_unix = hasattr(socket, "AF_UNIX") and family == socket.AF_UNIX + is_unix = hasattr(socket, "AF_UNIX") and family == socket.AF_UNIX #@UndefinedVariable pylint: disable=E1101 is_inet = family in (socket.AF_INET, socket.AF_INET6) # runs a series of default operation for the SSL related attributes diff --git a/src/netius/base/common.py b/src/netius/base/common.py index 5894bf6f..e75091b0 100644 --- a/src/netius/base/common.py +++ b/src/netius/base/common.py @@ -57,9 +57,10 @@ from .. import middleware -from .conn import * #@UnusedWildImport -from .poll import * #@UnusedWildImport -from .asynchronous import * #@UnusedWildImport +from .conn import * #@UnusedWildImport pylint: disable=W0614 +from .poll import * #@UnusedWildImport pylint: disable=W0614 +from .service import * #@UnusedWildImport pylint: disable=W0614 +from .asynchronous import * #@UnusedWildImport pylint: disable=W0614 NAME = "netius" """ The global infra-structure name to be used in the @@ -199,7 +200,7 @@ netius communication infra-structure """ UDP_TYPE = 2 -""" The datagram based udp protocol enumeration value to be used +""" The datagram based UDP protocol enumeration value to be used in static references to this kind of socket usage """ STATE_STOP = 1 @@ -361,6 +362,7 @@ def __init__(self, name = None, handlers = None, *args, **kwargs): self._forked = False self._child = False self._concrete = False + self._services = {} self._childs = [] self._events = {} self._notified = [] @@ -422,11 +424,22 @@ def get_asyncio(cls): return asyncio.get_event_loop() @classmethod - def set_main(cls, instance, set_compat = True): + def set_main(cls, instance, set_compat = True, set_running = True): + # encapsulates the current event loop instance with a compatibility + # layer (with asyncio) and then updates both the global reference + # to the Netius event loop and the compatibility version of it compat = compat_loop(instance) cls._MAIN = instance cls._MAIN_C = compat + + # if the compatibility layer for setting the main event loop + # is not requested (updating asyncio to make sure it returns + # the current loop as the main one) then there's nothing else + # remaining to be done, returns the control flow if not set_compat: return + + # tries to obtain the asyncio reference in case there's + # none available returns immediately (not possible to set main) asyncio = asynchronous.get_asyncio() if not asyncio: return @@ -440,6 +453,15 @@ def set_main(cls, instance, set_compat = True): # current compatible version of the netius loop asyncio.set_event_loop(compat) + # in case the set running flag is set indicating that + # the currently running loop should also be set then + # updates the currently running loop in the asyncio + # infrastructure, so that the next time an executor + # requests the currently running loop with `get_running_loop` + # the current compat loop is the one they get + if set_running and hasattr(asyncio, "_set_running_loop"): + asyncio._set_running_loop(compat) + @classmethod def unset_main(cls, set_compat = True): cls.set_main(None, set_compat = set_compat) @@ -755,7 +777,7 @@ def ensure( # creates the coroutine that is going to be used to # encapsulate the callable, note that the result of the # callable is set as the result of the future (as expected) - def coroutine(future, *args, **kwargs): + def coroutine(future, *args, **kwargs): #pylint ignore=E0102 yield result = coroutine_c(*args, **kwargs) future.set_result(result) @@ -1444,8 +1466,8 @@ def bind_signals( signals = ( signal.SIGINT, signal.SIGTERM, - signal.SIGHUP if hasattr(signal, "SIGHUP") else None, #@UndefinedVariable - signal.SIGQUIT if hasattr(signal, "SIGQUIT") else None #@UndefinedVariable + signal.SIGHUP if hasattr(signal, "SIGHUP") else None, #@UndefinedVariable pylint: disable=E1101 + signal.SIGQUIT if hasattr(signal, "SIGQUIT") else None #@UndefinedVariable pylint: disable=E1101 ), handler = None ): @@ -1862,7 +1884,7 @@ def pipe_send(message): # iterates of the requested (number of children) to run # the concrete fork operation and fork the logic for _index in range(self.children): - pid = os.fork() #@UndefinedVariable + pid = os.fork() #@UndefinedVariable pylint: disable=E1101 self._child = pid == 0 if self._child: self.on_child(pipe = pipe_send) if self._child: break @@ -2039,7 +2061,12 @@ def reads(self, reads, state = True): # iterates over all of the read events and calls the proper on # read method handler to properly handle each event - for read in reads: self.on_read(read) + for read in reads: + if read in self._services: + service = self._services[read] + self.on_read_s(read, service) + else: + self.on_read(read) def writes(self, writes, state = True): # in case the update state is requested updates the current loop @@ -2069,6 +2096,209 @@ def errors(self, errors, state = True): # error method handler to properly handle each event for error in errors: self.on_error(error) + def serve( + self, + host = None, + port = 9090, + type = TCP_TYPE, + ipv6 = False, + ssl = False, + key_file = None, + cer_file = None, + ca_file = None, + ca_root = True, + ssl_verify = False, + ssl_host = None, + ssl_fingerprint = None, + ssl_dump = False, + family = socket.AF_INET, + backlog = socket.SOMAXCONN, + env = False, + callback = None + ): + # ensures the proper default address value, taking into account + # the type of connection that is currently being used, this avoids + # problems with multiple stack based servers (IPv4 and ipv6) + if host == None: host = "::1" if ipv6 else "127.0.0.1" + + # defaults the provided ssl key and certificate paths to the + # ones statically defined (dummy certificates), please beware + # that using these certificates may create validation problems + key_file = key_file or SSL_KEY_PATH + cer_file = cer_file or SSL_CER_PATH + ca_file = ca_file or SSL_CA_PATH + + # determines if the client side certificate should be verified + # according to the loaded certificate authority values or if + # on the contrary no (client) validation should be performed + ssl_verify = ssl_verify or False + + # verifies if the type of server that is going to be created is + # unix or internet based, this allows the current infra-structure + # to work under the much more latency free unix sockets + is_unix = host == "unix" + + # checks the type of service that is meant to be created and + # creates a service socket according to the defined service + family = socket.AF_INET6 if ipv6 else socket.AF_INET + family = socket.AF_UNIX if is_unix else family #@UndefinedVariable pylint: disable=E1101 + if type == TCP_TYPE: + _socket = self.socket_tcp( + ssl, + key_file = key_file, + cer_file = cer_file, + ca_file = ca_file, + ca_root = ca_root, + ssl_verify = ssl_verify, + family = family + ) + elif type == UDP_TYPE: + _socket = self.socket_udp() + else: + raise errors.NetiusError("Invalid server type provided '%d'" % type) + + # "calculates" the address "bind target", taking into account that this + # server may be running under a unix based socket infra-structure and + # if that's the case the target (file path) is also removed, avoiding + # a duplicated usage of the socket (required for address re-usage) + address = port if is_unix else (host, port) + if is_unix and os.path.exists(address): os.remove(address) + + # binds the socket to the provided address value (per spec) and then + # starts the listening in the socket with the provided backlog value + # defaulting to the typical maximum backlog as possible if not provided + _socket.bind(address) + if type == TCP_TYPE: _socket.listen(backlog) + + # in case the selected port is zero based, meaning that a randomly selected + # port has been assigned by the bind operation the new port must be retrieved + # and set for the current server instance as the new port (for future reference) + if port == 0: port = _socket.getsockname()[1] + + # creates the string that identifies it the current service connection + # is using a secure channel (ssl) and then prints an info message about + # the service that is going to be started + ipv6_s = " on ipv6" if ipv6 else "" + ssl_s = " using ssl" if ssl else "" + self.info("Serving '%s' service on %s:%s%s%s ..." % (self.name, host, port, ipv6_s, ssl_s)) + + # ensures that the current polling mechanism is correctly open as the + # service socket is going to be added to it next, this overrides the + # default behavior of the common infra-structure (on start) + self.poll = self.build_poll() + self.poll.open(timeout = self.poll_timeout) + + # adds the socket to all of the pool lists so that it's ready to read + # write and handle error, this is the expected behavior of a service + # socket so that it can handle all of the expected operations + self.sub_all(_socket) + + # calls the on serve callback handler so that underlying services may be + # able to respond to the fact that the service is starting and some of + # them may print some specific debugging information + self.on_serve() + + # creates a new service instance that is going to represents the new server + # that is going to start accepting new connections + service = self.new_service(_socket, ssl) + + # delays the operation of calling the callback with the new (service) instance + # by one tick (avoids possible issues) + self.delay( + lambda: callback and callback(service, True), + immediately = True + ) + + def socket_tcp( + self, + ssl = False, + key_file = None, + cer_file = None, + ca_file = None, + ca_root = True, + ssl_verify = False, + family = socket.AF_INET, + type = socket.SOCK_STREAM, + receive_buffer = None, + send_buffer = None + ): + # verifies if the provided family is of type internet and if that's + # the case the associated flag is set to valid for usage + is_inet = family in (socket.AF_INET, socket.AF_INET6) + + # retrieves the proper string based type for the current server socket + # and the prints a series of log message about the socket to be created + type_s = " SSL" if ssl else "" + self.debug("Creating server's TCP%s socket ..." % type_s) + if ssl: self.debug("Loading '%s' as key file" % key_file) + if ssl: self.debug("Loading '%s' as certificate file" % cer_file) + if ssl and ca_file: self.debug("Loading '%s' as certificate authority file" % ca_file) + if ssl and ssl_verify: self.debug("Loading with client SSL verification") + + # creates the socket that it's going to be used for the listening + # of new connections (server socket) and sets it as non blocking + _socket = socket.socket(family, type) + _socket.setblocking(0) + + # in case the server is meant to be used as SSL wraps the socket + # in suck fashion so that it becomes "secured" + if ssl: _socket = self._ssl_wrap( + _socket, + key_file = key_file, + cer_file = cer_file, + ca_file = ca_file, + ca_root = ca_root, + server = True, + ssl_verify = ssl_verify + ) + + # sets the various options in the service socket so that it becomes + # ready for the operation with the highest possible performance, these + # options include the reuse address to be able to re-bind to the port + # and address and the keep alive that drops connections after some time + # avoiding the leak of connections (operative system managed) + _socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + _socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + if is_inet: _socket.setsockopt( + socket.IPPROTO_TCP, + socket.TCP_NODELAY, + 1 + ) + if receive_buffer: _socket.setsockopt( + socket.SOL_SOCKET, + socket.SO_RCVBUF, + receive_buffer + ) + if send_buffer: _socket.setsockopt( + socket.SOL_SOCKET, + socket.SO_SNDBUF, + send_buffer + ) + self._socket_keepalive(_socket) + + # returns the created tcp socket to the calling method so that it + # may be used from this point on + return _socket + + def socket_udp(self, family = socket.AF_INET, type = socket.SOCK_DGRAM): + # prints a small debug message about the UDP socket that is going + # to be created for the server's connection + self.debug("Creating server's UDP socket ...") + + # creates the socket that it's going to be used for the listening + # of new connections (server socket) and sets it as non blocking + _socket = socket.socket(family, type) + _socket.setblocking(0) + + # sets the various options in the service socket so that it becomes + # ready for the operation with the highest possible performance + _socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + _socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + + # returns the created UDP socket to the calling method so that it + # may be used from this point on + return _socket + def datagram( self, family = socket.AF_INET, @@ -2187,13 +2417,13 @@ def connect( # ensures that the proper socket family is defined in case the # requested host value is unix socket oriented, this step greatly # simplifies the process of created unix socket based clients - family = socket.AF_UNIX if host == "unix" else family + family = socket.AF_UNIX if host == "unix" else family #@UndefinedVariable pylint: disable=E1101 # verifies the kind of socket that is going to be used for the # connect operation that is going to be performed, note that the # unix type should be used with case as it does not exist in every # operative system and may raised an undefined exceptions - is_unix = hasattr(socket, "AF_UNIX") and family == socket.AF_UNIX + is_unix = hasattr(socket, "AF_UNIX") and family == socket.AF_UNIX #@UndefinedVariable pylint: disable=E1101 is_inet = family in (socket.AF_INET, socket.AF_INET6) # runs a series of default operation for the SSL related attributes @@ -2661,15 +2891,47 @@ def on_error(self, _socket): connection.close() + def on_read_s(self, _socket, service): + try: + while True: + socket_c, address = _socket.accept() + try: service.on_socket_c(socket_c, address) + except Exception: socket_c.close(); raise + except ssl.SSLError as error: + error_v = error.args[0] if error.args else None + error_m = error.reason if hasattr(error, "reason") else None + if error_v in SSL_SILENT_ERRORS: + self.on_expected_s(error) + elif not error_v in SSL_VALID_ERRORS and\ + not error_m in SSL_VALID_REASONS: + self.on_exception_s(error) + except socket.error as error: + error_v = error.args[0] if error.args else None + if error_v in SILENT_ERRORS: + self.on_expected_s(error) + elif not error_v in VALID_ERRORS: + self.on_exception_s(error) + except (KeyboardInterrupt, SystemExit): + raise + except BaseException as exception: + self.on_exception_s(exception) + def on_exception(self, exception, connection): self.warning(exception) self.log_stack() connection.close() + def on_exception_s(self, exception): + self.warning(exception) + self.log_stack() + def on_expected(self, exception, connection): self.debug(exception) connection.close() + def on_expected_s(self, exception): + self.debug(exception) + def on_connect(self, connection): connection.set_connected() if not hasattr(connection, "tuple"): return @@ -2714,6 +2976,9 @@ def on_data(self, connection, data): def on_data_base(self, connection, data): connection.set_data(data) + def on_serve(self): + pass + def info_dict(self, full = False): info = dict( loaded = self._loaded, @@ -2792,6 +3057,77 @@ def build_connection( ssl = ssl ) + def build_connection_client( + self, + socket_c, + address, + ssl = False, + receive_buffer_c = None, + send_buffer_c = None + ): + # verifies a series of pre-conditions on the socket so + # that it's ensured to be in a valid state before it's + # set as a new connection for the server (validation) + if ssl and not socket_c._sslobj: socket_c.close(); return + + # in case the SSL mode is enabled, "patches" the socket + # object with an extra pending reference, that is going + # to be to store pending callable operations in it + if ssl: socket_c.pending = None + + # verifies if the socket is of type internet (either IPv4 + # of ipv6), this is going to be used for conditional setting + # of some of the socket options + is_inet = socket_c.family in (socket.AF_INET, socket.AF_INET6) + + # sets the socket as non blocking and then updated a series + # of options in it, some of them taking into account if the + # socket if of type internet (timeout values) + socket_c.setblocking(0) + socket_c.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + if is_inet: socket_c.setsockopt( + socket.IPPROTO_TCP, + socket.TCP_NODELAY, + 1 + ) + if receive_buffer_c: socket_c.setsockopt( + socket.SOL_SOCKET, + socket.SO_RCVBUF, + receive_buffer_c + ) + if send_buffer_c: socket_c.setsockopt( + socket.SOL_SOCKET, + socket.SO_SNDBUF, + send_buffer_c + ) + + # the process creation is considered completed and a new + # connection is created for it and opened, from this time + # on a new connection is considered accepted/created for server + connection = self.build_connection(socket_c, address, ssl = ssl) + connection.open() + + # registers the SSL handshake method as a starter method + # for the connection, so that the handshake is properly + # performed on the initial stage of the connection (as expected) + if ssl: connection.add_starter(self._ssl_handshake) + + # runs the initial try for the handshaking process, note that + # this is an async process and further tries to the handshake + # may come after this one (async operation) in case an exception + # is raises the connection is closed (avoids possible errors) + try: connection.run_starter() + except Exception: connection.close(); raise + + # in case there's extraneous data pending to be read from the + # current connection's internal receive buffer it must be properly + # handled on the risk of blocking the newly created connection + if connection.is_pending_data(): self.on_read(connection.socket) + + # returns the connection that has been build already properly + # initialized and ready to be used + return connection + def base_connection(self, *args, **kwargs): connection = Base.build_connection(self, *args, **kwargs) connection._base = True @@ -2807,6 +3143,11 @@ def del_connection(self, connection): return Base.on_connection_d(self, connection) return self.on_connection_d(connection) + def new_service(self, socket, ssl = False): + service = Service(owner = self, socket = socket, ssl = ssl) + self._services[socket] = service + return service + def add_callback(self, socket, callback): callbacks = self.callbacks_m.get(socket, []) if callback in callbacks: return @@ -3537,27 +3878,27 @@ def _socket_keepalive( if count == None: count = self.keepalive_count is_inet = _socket.family in (socket.AF_INET, socket.AF_INET6) is_inet and hasattr(_socket, "TCP_KEEPIDLE") and\ - self.socket.setsockopt( + _socket.setsockopt( socket.IPPROTO_TCP, - socket.TCP_KEEPIDLE, #@UndefinedVariable + socket.TCP_KEEPIDLE, #@UndefinedVariable pylint: disable=E1101 timeout ) is_inet and hasattr(_socket, "TCP_KEEPINTVL") and\ - self.socket.setsockopt( + _socket.setsockopt( socket.IPPROTO_TCP, - socket.TCP_KEEPINTVL, #@UndefinedVariable + socket.TCP_KEEPINTVL, #@UndefinedVariable pylint: disable=E1101 interval ) is_inet and hasattr(_socket, "TCP_KEEPCNT") and\ - self.socket.setsockopt( + _socket.setsockopt( socket.IPPROTO_TCP, - socket.TCP_KEEPCNT, #@UndefinedVariable + socket.TCP_KEEPCNT, #@UndefinedVariable pylint: disable=E1101 count ) hasattr(_socket, "SO_REUSEPORT") and\ - self.socket.setsockopt( + _socket.setsockopt( socket.SOL_SOCKET, - socket.SO_REUSEPORT, #@UndefinedVariable + socket.SO_REUSEPORT, #@UndefinedVariable pylint: disable=E1101 1 ) @@ -4074,10 +4415,20 @@ def run(self): self.owner._thread = None self.owner = None -def new_loop_main(factory = None, _compat = None, **kwargs): - factory = factory or Base +def new_loop_main(factory = None, env = True, _compat = None, **kwargs): kwargs["_slave"] = kwargs.pop("_slave", True) + + # obtains the factory method defaulting to the Netius base event loop + # constructor and creates a new instance of the event loop + factory = factory or Base instance = factory(**kwargs) + + # in case the loading of the environment variables has been requested + # and the provided event loop instance is compatible with environment + # binding operation, then performs the operation + if env and hasattr(instance, "bind_env"): + instance.bind_env() + return compat_loop(instance) if _compat else instance def new_loop_asyncio(**kwargs): @@ -4091,10 +4442,23 @@ def new_loop(factory = None, _compat = None, asyncio = None, **kwargs): if asyncio: return new_loop_asyncio(**kwargs) else: return new_loop_main(factory = factory, _compat = _compat, **kwargs) -def ensure_main(factory = None, **kwargs): +def ensure_main(factory = None, env = True, **kwargs): if Base.get_main(): return + + # obtains the factory method defaulting to the Netius base event loop + # constructor and creates a new instance of the event loop factory = factory or Base instance = factory(**kwargs) + + # in case the loading of the environment variables has been requested + # and the provided event loop instance is compatible with environment + # binding operation, then performs the operation + if env and hasattr(instance, "bind_env"): + instance.bind_env() + + # updates the reference to the main event loop under the Netius perspective + # so that global variables point to this event loop, possible compatibility + # mode rules may apply Base.set_main(instance) def ensure_asyncio(**kwargs): diff --git a/src/netius/base/compat.py b/src/netius/base/compat.py index e933204b..0d39e83e 100644 --- a/src/netius/base/compat.py +++ b/src/netius/base/compat.py @@ -213,6 +213,49 @@ def _run_in_executor(self, executor, func, *args): future = executor.submit(func, *args) yield future + def _create_server( + self, + protocol_factory, + host = None, + port = None, + family = 0, + flags = 0, + sock = None, + backlog = 100, + ssl = None, + reuse_address = None, + reuse_port = None, + *args, + **kwargs + ): + family = family or socket.AF_INET + + future = self.create_future() + + def on_complete(service, success): + if success: on_success(service) + else: on_error(service) + + def on_success(service): + server = transport.ServerTransport(self, service) + server._set_compat(protocol_factory) + future.set_result(server) + + def on_error(connection): + future.set_exception( + errors.RuntimeError("Connection issue") + ) + + self._loop.serve( + host, + port, + ssl = ssl, + family = family, + callback = on_complete + ) + + yield future + def _create_connection( self, protocol_factory, @@ -305,15 +348,51 @@ def on_error(connection): self._loop.delay(lambda: on_complete(connection, True)) yield future + def _start_serving( + self, + protocol_factory, + sock, + sslcontext = None, + server = None, + backlog = 100, + ssl_handshake_timeout = None + ): + # @todo this is pending proper Netius implementation + self._add_reader( + sock.fileno(), + self._accept_connection, + protocol_factory, + sock, + sslcontext, + server, + backlog, + ssl_handshake_timeout + ) + def _set_current_task(self, task): + """ + Updates the currently executing task in the global + asyncio state, remember that only one task can be + running per each event loop. + + :type task: Task + :param task: The task object that is going to be set + as the currently running task. + """ + asyncio = asynchronous.get_asyncio() if not asyncio: return - asyncio.Task._current_tasks[self] = task + self._current_tasks[self] = task def _unset_current_task(self): + """ + Removes the currently running task for the current + event loop (pop operation). + """ + asyncio = asynchronous.get_asyncio() if not asyncio: return - asyncio.Task._current_tasks.pop(self, None) + self._current_tasks.pop(self, None) def _call_delay( self, @@ -370,6 +449,12 @@ def _default_handler(self, context): def _thread_id(self): return self._loop.tid + @property + def _current_tasks(self): + if hasattr(asyncio.tasks, "_current_tasks"): + return asyncio.tasks._current_tasks + return asyncio.Task._current_tasks + def is_compat(): """ Determines if the compatibility mode for the netius @@ -405,6 +490,11 @@ def is_asyncio(): asyncio = config.conf("ASYNCIO", False, cast = bool) return asyncio and asynchronous.is_asynclib() +def run(coro): + from . import common + loop = common.get_loop(_compat = True) + loop.run_until_complete(coro) + def build_datagram(*args, **kwargs): if is_compat(): return _build_datagram_compat(*args, **kwargs) else: return _build_datagram_native(*args, **kwargs) @@ -413,6 +503,10 @@ def connect_stream(*args, **kwargs): if is_compat(): return _connect_stream_compat(*args, **kwargs) else: return _connect_stream_native(*args, **kwargs) +def serve_stream(*args, **kwargs): + if is_compat(): return _serve_stream_compat(*args, **kwargs) + else: return _serve_stream_native(*args, **kwargs) + def _build_datagram_native( protocol_factory, family = socket.AF_INET, @@ -424,6 +518,19 @@ def _build_datagram_native( *args, **kwargs ): + """ + Builds a datagram assuming that the current event + loop in execution is a Netius one and that the support + for the Netius specific methods exist. + + This method is typically faster than using the compat + one which only makes use of the asyncio API. + + The end goal of this method is to call the callback method + with a tuple containing both the transport and the protocol + for the requested datagram based "connection". + """ + from . import common loop = loop or common.get_loop() @@ -449,7 +556,7 @@ def on_connect(connection): _transport = transport.TransportDatagram(loop, connection) _transport._set_compat(protocol) if not callback: return - callback((_transport, protocol)) + if callback: callback((_transport, protocol)) def on_error(connection): protocol.close() @@ -485,7 +592,7 @@ def on_connect(future): protocol.close() else: result = future.result() - callback and callback(result) + if callback: callback(result) remote_addr = (remote_host, remote_port) if\ remote_host and remote_port else kwargs.pop("remote_addr", None) @@ -520,6 +627,19 @@ def _connect_stream_native( *args, **kwargs ): + """ + Runs the connect operation for a given stream using the internal + Netius based strategy, meaning that the underlying structures + involved should include connection and the base Netius event loop + methods should be used. + + The end goal of this function is to call the provided callback + with a tuple containing both a transport and a protocol instance. + + This callback should only be called once a proper connection has + been established. + """ + from . import common loop = loop or common.get_loop() @@ -551,7 +671,7 @@ def on_connect(connection): _transport = transport.TransportStream(loop, connection) _transport._set_compat(protocol) if not callback: return - callback((_transport, protocol)) + if callback: callback((_transport, protocol)) def on_error(connection): protocol.close() @@ -593,7 +713,7 @@ def on_connect(future): protocol.close() else: result = future.result() - callback and callback(result) + if callback: callback(result) if ssl and cer_file and key_file: import ssl as _ssl @@ -615,3 +735,78 @@ def on_connect(future): future.add_done_callback(on_connect) return loop + +def _serve_stream_native( + protocol_factory, + host, + port, + ssl = False, + key_file = None, + cer_file = None, + ca_file = None, + ca_root = True, + ssl_verify = False, + family = socket.AF_INET, + type = socket.SOCK_STREAM, + backlog = None, + reuse_address = None, + reuse_port = None, + callback = None, + loop = None, + *args, + **kwargs +): + from . import common + + loop = loop or common.get_loop() + + protocol = protocol_factory() + has_loop_set = hasattr(protocol, "loop_set") + if has_loop_set: protocol.loop_set(loop) + + def on_ready(): + loop.serve( + host = host, + port = port, + callback = on_complete + ) + + def on_complete(service, success): + if success: on_success(service) + else: on_error(service) + + def on_success(service): + server = transport.ServerTransport(loop, service) + server._set_compat(protocol) + if not callback: return + callback(server) + + def on_error(connection): + protocol.close() + + loop.delay(on_ready) + + return loop + +def _serve_stream_compat( + protocol_factory, + host, + port, + ssl = False, + key_file = None, + cer_file = None, + ca_file = None, + ca_root = True, + ssl_verify = False, + family = socket.AF_INET, + type = socket.SOCK_STREAM, + backlog = None, + reuse_address = None, + reuse_port = None, + callback = None, + loop = None, + *args, + **kwargs +): + #@todo: implement this stuff + pass diff --git a/src/netius/base/config.py b/src/netius/base/config.py index 11abb07f..4868b950 100644 --- a/src/netius/base/config.py +++ b/src/netius/base/config.py @@ -92,10 +92,8 @@ to be the home on in terms of configuration, this value should be set on the initial loading of the ".home" file """ -__builtins__ = __builtins__ if isinstance(__builtins__, dict) else\ - __builtins__.__dict__ -""" The global builtins reference created by the proper redefinition -of the variable if that's required by python implementation """ +if not isinstance(__builtins__, dict): #pylint: disable=E0601 + __builtins__ = __builtins__.__dict__ def conf(name, default = None, cast = None, ctx = None): """ diff --git a/src/netius/base/container.py b/src/netius/base/container.py index 12f8b4a7..b1f9402c 100644 --- a/src/netius/base/container.py +++ b/src/netius/base/container.py @@ -39,7 +39,7 @@ from . import server -from .common import * #@UnusedWildImport +from .common import * #@UnusedWildImport pylint: disable=W0614 class Container(Base): diff --git a/src/netius/base/poll.py b/src/netius/base/poll.py index cd7885a9..9078a246 100644 --- a/src/netius/base/poll.py +++ b/src/netius/base/poll.py @@ -88,7 +88,7 @@ def close(self): self.error_o.clear() def poll(self): - return [] + return ([], [], []) def poll_owner(self): reads, writes, errors = self.poll() @@ -188,7 +188,7 @@ def open(self, timeout = POLL_TIMEOUT): self._open = True self.timeout = timeout - self.epoll = select.epoll() #@UndefinedVariable + self.epoll = select.epoll() #@UndefinedVariable pylint: disable=E1101 self.fd_m = {} @@ -215,13 +215,13 @@ def poll(self): events = self.epoll.poll(self.timeout) for fd, event in events: - if event & select.EPOLLIN: #@UndefinedVariable + if event & select.EPOLLIN: #@UndefinedVariable pylint: disable=E1101 socket = self.fd_m.get(fd, None) socket and result[0].append(socket) - if event & select.EPOLLOUT: #@UndefinedVariable + if event & select.EPOLLOUT: #@UndefinedVariable pylint: disable=E1101 socket = self.fd_m.get(fd, None) socket and result[1].append(socket) - if event & select.EPOLLERR or event & select.EPOLLHUP: #@UndefinedVariable + if event & select.EPOLLERR or event & select.EPOLLHUP: #@UndefinedVariable pylint: disable=E1101 socket = self.fd_m.get(fd, None) socket and result[2].append(socket) @@ -237,9 +237,9 @@ def sub_read(self, socket, owner = None): self.read_o[socket] = owner self.write_o[socket] = owner self.error_o[socket] = owner - self.epoll.register( #@UndefinedVariable + self.epoll.register( #@UndefinedVariable pylint: disable=E1101 socket_fd, - select.EPOLLIN | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP | select.EPOLLET #@UndefinedVariable + select.EPOLLIN | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP | select.EPOLLET #@UndefinedVariable pylint: disable=E1101 ) def sub_write(self, socket, owner = None): @@ -251,7 +251,7 @@ def sub_error(self, socket, owner = None): def unsub_read(self, socket): if not socket in self.read_o: return socket_fd = socket.fileno() - self.epoll.unregister( #@UndefinedVariable + self.epoll.unregister( #@UndefinedVariable pylint: disable=E1101 socket_fd ) del self.fd_m[socket_fd] @@ -281,7 +281,7 @@ def open(self, timeout = POLL_TIMEOUT): self.timeout = timeout if self.timeout < 0: self.timeout = None - self.kqueue = select.kqueue() #@UndefinedVariable + self.kqueue = select.kqueue() #@UndefinedVariable pylint: disable=E1101 self.fd_m = {} @@ -307,16 +307,16 @@ def poll(self): events = self.kqueue.control(None, 32, self.timeout) for event in events: - if event.flags & select.KQ_EV_ERROR: #@UndefinedVariable + if event.flags & select.KQ_EV_ERROR: #@UndefinedVariable pylint: disable=E1101 socket = self.fd_m.get(event.udata, None) socket and result[2].append(socket) - elif event.filter == select.KQ_FILTER_READ: #@UndefinedVariable + elif event.filter == select.KQ_FILTER_READ: #@UndefinedVariable pylint: disable=E1101 socket = self.fd_m.get(event.udata, None) - index = 2 if event.flags & select.KQ_EV_EOF else 0 #@UndefinedVariable + index = 2 if event.flags & select.KQ_EV_EOF else 0 #@UndefinedVariable pylint: disable=E1101 socket and result[index].append(socket) - elif event.filter == select.KQ_FILTER_WRITE: #@UndefinedVariable + elif event.filter == select.KQ_FILTER_WRITE: #@UndefinedVariable pylint: disable=E1101 socket = self.fd_m.get(event.udata, None) - index = 2 if event.flags & select.KQ_EV_EOF else 1 #@UndefinedVariable + index = 2 if event.flags & select.KQ_EV_EOF else 1 #@UndefinedVariable pylint: disable=E1101 socket and result[index].append(socket) return result @@ -331,17 +331,17 @@ def sub_read(self, socket, owner = None): self.read_o[socket] = owner self.write_o[socket] = owner self.error_o[socket] = owner - event = select.kevent( #@UndefinedVariable + event = select.kevent( #@UndefinedVariable pylint: disable=E1101 socket_fd, - filter = select.KQ_FILTER_READ, #@UndefinedVariable - flags = select.KQ_EV_ADD | select.KQ_EV_CLEAR, #@UndefinedVariable + filter = select.KQ_FILTER_READ, #@UndefinedVariable pylint: disable=E1101 + flags = select.KQ_EV_ADD | select.KQ_EV_CLEAR, #@UndefinedVariable pylint: disable=E1101 udata = socket_fd ) self.kqueue.control([event], 0) - event = select.kevent( #@UndefinedVariable + event = select.kevent( #@UndefinedVariable pylint: disable=E1101 socket_fd, - filter = select.KQ_FILTER_WRITE, #@UndefinedVariable - flags = select.KQ_EV_ADD | select.KQ_EV_CLEAR, #@UndefinedVariable + filter = select.KQ_FILTER_WRITE, #@UndefinedVariable pylint: disable=E1101 + flags = select.KQ_EV_ADD | select.KQ_EV_CLEAR, #@UndefinedVariable pylint: disable=E1101 udata = socket_fd ) self.kqueue.control([event], 0) @@ -355,16 +355,16 @@ def sub_error(self, socket, owner = None): def unsub_read(self, socket): if not socket in self.read_o: return socket_fd = socket.fileno() - event = select.kevent( #@UndefinedVariable + event = select.kevent( #@UndefinedVariable pylint: disable=E1101 socket_fd, - filter = select.KQ_FILTER_READ, #@UndefinedVariable - flags = select.KQ_EV_DELETE #@UndefinedVariable + filter = select.KQ_FILTER_READ, #@UndefinedVariable pylint: disable=E1101 + flags = select.KQ_EV_DELETE #@UndefinedVariable pylint: disable=E1101 ) self.kqueue.control([event], 0) - event = select.kevent( #@UndefinedVariable + event = select.kevent( #@UndefinedVariable pylint: disable=E1101 socket_fd, - filter = select.KQ_FILTER_WRITE, #@UndefinedVariable - flags = select.KQ_EV_DELETE #@UndefinedVariable + filter = select.KQ_FILTER_WRITE, #@UndefinedVariable pylint: disable=E1101 + flags = select.KQ_EV_DELETE #@UndefinedVariable pylint: disable=E1101 ) self.kqueue.control([event], 0) del self.fd_m[socket_fd] @@ -393,7 +393,7 @@ def open(self, timeout = POLL_TIMEOUT): self._open = True self.timeout = timeout - self._poll = select.poll() #@UndefinedVariable + self._poll = select.poll() #@UndefinedVariable pylint: disable=E1101 self.read_fd = {} self.write_fd = {} @@ -421,13 +421,13 @@ def poll(self): events = self._poll.poll(self.timeout * 1000) for fd, event in events: - if event & select.POLLIN: #@UndefinedVariable + if event & select.POLLIN: #@UndefinedVariable pylint: disable=E1101 socket = self.read_fd.get(fd, None) socket and result[0].append(socket) - if event & select.POLLOUT: #@UndefinedVariable + if event & select.POLLOUT: #@UndefinedVariable pylint: disable=E1101 socket = self.write_fd.get(fd, None) socket and result[1].append(socket) - if event & select.POLLERR or event & select.POLLHUP: #@UndefinedVariable + if event & select.POLLERR or event & select.POLLHUP: #@UndefinedVariable pylint: disable=E1101 socket = self.read_fd.get(fd, None) socket and result[2].append(socket) @@ -441,9 +441,9 @@ def sub_read(self, socket, owner = None): socket_fd = socket.fileno() self.read_fd[socket_fd] = socket self.read_o[socket] = owner - self._poll.register( #@UndefinedVariable + self._poll.register( #@UndefinedVariable pylint: disable=E1101 socket_fd, - select.POLLIN #@UndefinedVariable + select.POLLIN #@UndefinedVariable pylint: disable=E1101 ) def sub_write(self, socket, owner = None): @@ -451,9 +451,9 @@ def sub_write(self, socket, owner = None): socket_fd = socket.fileno() self.write_fd[socket_fd] = socket self.write_o[socket] = owner - self._poll.modify( #@UndefinedVariable + self._poll.modify( #@UndefinedVariable pylint: disable=E1101 socket_fd, - select.POLLIN | select.POLLOUT #@UndefinedVariable + select.POLLIN | select.POLLOUT #@UndefinedVariable pylint: disable=E1101 ) def sub_error(self, socket, owner = None): @@ -463,7 +463,7 @@ def sub_error(self, socket, owner = None): def unsub_read(self, socket): if not socket in self.read_o: return socket_fd = socket.fileno() - self._poll.unregister( #@UndefinedVariable + self._poll.unregister( #@UndefinedVariable pylint: disable=E1101 socket_fd ) del self.read_fd[socket_fd] @@ -472,9 +472,9 @@ def unsub_read(self, socket): def unsub_write(self, socket): if not socket in self.write_o: return socket_fd = socket.fileno() - self._poll.modify( #@UndefinedVariable + self._poll.modify( #@UndefinedVariable pylint: disable=E1101 socket_fd, - select.POLLIN #@UndefinedVariable + select.POLLIN #@UndefinedVariable pylint: disable=E1101 ) del self.write_fd[socket_fd] del self.write_o[socket] diff --git a/src/netius/base/protocol.py b/src/netius/base/protocol.py index f79748a6..57a3a57e 100644 --- a/src/netius/base/protocol.py +++ b/src/netius/base/protocol.py @@ -262,8 +262,8 @@ def _flush_send(self): if not self._delayed: break if not self._writing: break data, address, callback = self._delayed.pop(0) - if address: self.send(data, address, callback = callback) - else: self.send(data, callback = callback) + if address: self.send(data, address, callback = callback) #pylint: disable=E1101 + else: self.send(data, callback = callback) #pylint: disable=E1101 class DatagramProtocol(Protocol): diff --git a/src/netius/base/server.py b/src/netius/base/server.py index c44f25af..028f9a25 100644 --- a/src/netius/base/server.py +++ b/src/netius/base/server.py @@ -188,7 +188,7 @@ def serve( # ensures the proper default address value, taking into account # the type of connection that is currently being used, this avoids - # problems with multiple stack based servers (ipv4 and ipv6) + # problems with multiple stack based servers (IPv4 and ipv6) if host == None: host = "::1" if ipv6 else "127.0.0.1" # defaults the provided SSL key and certificate paths to the @@ -230,7 +230,7 @@ def serve( # checks the type of service that is meant to be created and # creates a service socket according to the defined service family = socket.AF_INET6 if ipv6 else socket.AF_INET - family = socket.AF_UNIX if is_unix else family + family = socket.AF_UNIX if is_unix else family #@UndefinedVariable pylint: disable=E1101 if type == TCP_TYPE: self.socket = self.socket_tcp( ssl, key_file = key_file, @@ -258,7 +258,7 @@ def serve( # in case the set user id value the user of the current process should # be changed so that it represents the new (possibly unprivileged user) - if setuid: os.setuid(setuid) + if setuid: os.setuid(setuid) #pylint: disable=E1101 # in case the selected port is zero based, meaning that a randomly selected # port has been assigned by the bind operation the new port must be retrieved @@ -371,9 +371,9 @@ def socket_tcp( return _socket def socket_udp(self, family = socket.AF_INET, type = socket.SOCK_DGRAM): - # prints a small debug message about the udp socket that is going + # prints a small debug message about the UDP socket that is going # to be created for the server's connection - self.debug("Creating server's udp socket ...") + self.debug("Creating server's UDP socket ...") # creates the socket that it's going to be used for the listening # of new connections (server socket) and sets it as non blocking @@ -385,7 +385,7 @@ def socket_udp(self, family = socket.AF_INET, type = socket.SOCK_DGRAM): _socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) _socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - # returns the created udp socket to the calling method so that it + # returns the created UDP socket to the calling method so that it # may be used from this point on return _socket @@ -910,7 +910,7 @@ def on_socket_c(self, socket_c, address): # to be to store pending callable operations in it if self.ssl: socket_c.pending = None - # verifies if the socket is of type internet (either ipv4 + # verifies if the socket is of type internet (either IPv4 # of ipv6), this is going to be used for conditional setting # of some of the socket options is_inet = socket_c.family in (socket.AF_INET, socket.AF_INET6) diff --git a/src/netius/base/service.py b/src/netius/base/service.py new file mode 100644 index 00000000..1df782bf --- /dev/null +++ b/src/netius/base/service.py @@ -0,0 +1,100 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Hive Netius System +# Copyright (c) 2008-2017 Hive Solutions Lda. +# +# This file is part of Hive Netius System. +# +# Hive Netius System is free software: you can redistribute it and/or modify +# it under the terms of the Apache License as published by the Apache +# Foundation, either version 2.0 of the License, or (at your option) any +# later version. +# +# Hive Netius System is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Apache License for more details. +# +# You should have received a copy of the Apache License along with +# Hive Netius System. If not, see . + +__author__ = "João Magalhães " +""" The author(s) of the module """ + +__version__ = "1.0.0" +""" The version of the module """ + +__revision__ = "$LastChangedRevision$" +""" The revision number of the module """ + +__date__ = "$LastChangedDate$" +""" The last change date of the module """ + +__copyright__ = "Copyright (c) 2008-2017 Hive Solutions Lda." +""" The copyright for the module """ + +__license__ = "Apache License, Version 2.0" +""" The license for the module """ + +import uuid +import socket + +from . import observer +from . import transport + +BUFFER_SIZE_S = None +""" The size of both the send and receive buffers for +the socket representing the server, this socket is +responsible for the handling of the new connections """ + +BUFFER_SIZE_C = None +""" The size of the buffers (send and receive) that +is going to be set on the on the sockets created by +the server (client sockets), this is critical for a +good performance of the server (large value) """ + +class Service(observer.Observable): + """ + Top level class responsible for the single representation + of the meta-data associated with a service. + + This is considered to be the equivalent to a connection object + for the servers (as opposed to clients). + + This implementation takes inspiration from the asyncio stream + and should be very compatible in terms of API. + """ + + def __init__( + self, + owner = None, + transport = None, + socket = None, + ssl = False, + receive_buffer_s = BUFFER_SIZE_S, + send_buffer_s = BUFFER_SIZE_S, + receive_buffer_c = BUFFER_SIZE_C, + send_buffer_c = BUFFER_SIZE_C + ): + observer.Observable.__init__(self) + self.id = str(uuid.uuid4()) + self.owner = owner + self.transport = transport + self.socket = socket + self.ssl = ssl + self.receive_buffer_s = receive_buffer_s + self.send_buffer_s = send_buffer_s + self.receive_buffer_c = receive_buffer_c + self.send_buffer_c = send_buffer_c + + def on_socket_c(self, socket_c, address): + connection = self.owner.build_connection_client( + socket_c, + address, + ssl = self.ssl, + receive_buffer_c = self.receive_buffer_c, + send_buffer_c = self.send_buffer_c + ) + _transport = transport.TransportStream(self, connection) + self.trigger("connection", connection) diff --git a/src/netius/base/transport.py b/src/netius/base/transport.py index 187c7fec..71d21ecb 100644 --- a/src/netius/base/transport.py +++ b/src/netius/base/transport.py @@ -39,6 +39,7 @@ from . import errors from . import observer +from . import asynchronous class Transport(observer.Observable): """ @@ -54,6 +55,7 @@ class Transport(observer.Observable): """ def __init__(self, loop, connection, open = True): + observer.Observable.__init__(self) self._loop = loop self._connection = connection self._protocol = None @@ -174,7 +176,7 @@ def set_extra_dict(self): compression = lambda: self._connection.socket.compression(), cipher = lambda: self._connection.socket.cipher(), peercert = lambda: self._connection.socket.getpeercert(), - sslcontext = lambda: self._connection.socket.context, + sslcontext = lambda: self._connection.socket.context if hasattr(self._connection.socket, "context") else None, ssl_object = lambda: self._connection.socket ) @@ -313,3 +315,115 @@ def _on_close(self, connection): if not self._protocol == None: self._protocol.eof_received() self._cleanup() + +class ServerTransport(observer.Observable): + """ + Decorator class to be used to add the functionality of a + server layer using a simplified and standard API that provides + the usage of the previously defined transport. + + Allows adding the functionality to an internal netius + service (or equivalent) object. + + This approach is heavily influenced by the design of the + asyncio Python infra-structure and should provide a mostly + compatible interface. + + :see: https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.Server + """ + + def __init__(self, loop, service, open = True): + observer.Observable.__init__(self) + self._loop = loop + self._service = service + self._protocol = None + self._extra_dict = None + self._exhausted = False + self._serving = False + if open: self.open() + + def __aenter__(self): + coroutine = self._aenter() + return asynchronous.coroutine_return(coroutine) + + def __aexit__(self, *exc): + coroutine = self._aexit() + return asynchronous.coroutine_return(coroutine) + + def open(self): + pass + + def close(self): + pass + + def get_loop(self): + return self._loop + + def start_serving(self): + coroutine = self._start_serving() + return asynchronous.coroutine_return(coroutine) + + def serve_forever(self): + coroutine = self._serve_forever() + return asynchronous.coroutine_return(coroutine) + + def is_serving(self): + return True + + def _set_compat(self, protocol_factory): + self.sockets = [self._service.socket] + self._set_binds() + self._set_protocol_factory(protocol_factory) + + def _set_binds(self): + self._service.bind("connection", self._on_connection) + + def _set_protocol_factory(self, protocol_factory, mark = True): + self._protocol_factory = protocol_factory + + def _on_connection(self, connection): + protocol = self._protocol_factory() + transport = TransportStream(self._loop, connection) + transport._set_compat(protocol) + + def _start_serving(self): + # in case the current context is already serving + # content, then ignores the current request, otherwise + # sets the context as serving + if self._serving: return + self._serving = True + + # iterates over the complete set of sockets of the + # server to be able to listed to them + for sock in self.sockets: + sock.listen(self._backlog) + self._loop._start_serving( + self._protocol_factory, + sock, + self._ssl_context, + self, + self._backlog, + self._ssl_handshake_timeout + ) + + # skips one loop iteration so that all 'loop.add_reader' + # go through. + #await tasks.sleep(0) + return None + + def _serve_forever(self): + future = self._loop.create_future() + yield future + + def _aenter(self): + try: future = self._loop.create_future() + except ReferenceError: yield None; return + future.set_result(self) + yield future + + def _aexit(self): + try: future = self._loop.create_future() + except ReferenceError: yield None; return + self.close() + future.set_result(self) + yield future diff --git a/src/netius/clients/http.py b/src/netius/clients/http.py index 69690bee..6f9c8474 100644 --- a/src/netius/clients/http.py +++ b/src/netius/clients/http.py @@ -826,7 +826,7 @@ def wrap_request( buffer = tempfile.NamedTemporaryFile(mode = "w+b") if use_file else [] self.request = dict(code = None, data = None) - def on_close(protocol): + def on_close(protocol): #pylint: disable=E0102 if _on_close: _on_close(protocol) protocol._request = None if self.request["code"]: return @@ -836,7 +836,7 @@ def on_close(protocol): request = self.request ) - def on_data(protocol, parser, data): + def on_data(protocol, parser, data): #pylint: disable=E0102 if _on_data: _on_data(protocol, parser, data) if use_file: buffer.write(data) else: buffer.append(data) @@ -844,7 +844,7 @@ def on_data(protocol, parser, data): self.request["received"] = received + len(data) self.request["last"] = time.time() - def callback(protocol, parser, message): + def callback(protocol, parser, message): #pylint: disable=E0102 if _callback: _callback(protocol, parser, message) if use_file: cls.set_request_file(parser, buffer, request = self.request) else: cls.set_request(parser, buffer, request = self.request) @@ -1421,7 +1421,7 @@ def on_message(protocol, parser, message): self.available[protocol.key] = protocol netius.compat_loop(loop).stop() - def on_close(protocol): + def on_close(protocol): #pylint: disable=E0102 # verifies if the protocol being closed is currently in # the pool of available protocols, so that decisions on # the stopping of the event loop may be made latter on diff --git a/src/netius/common/util.py b/src/netius/common/util.py index 887c5834..ae8f8653 100644 --- a/src/netius/common/util.py +++ b/src/netius/common/util.py @@ -219,7 +219,7 @@ def random_integer(number_bits): def host(default = "127.0.0.1"): """ Retrieves the host for the current machine, - typically this would be the ipv4 address of + typically this would be the IPv4 address of the main network interface. No result type are guaranteed and a local address diff --git a/src/netius/servers/__init__.py b/src/netius/servers/__init__.py index b38c6a5c..dbe38829 100644 --- a/src/netius/servers/__init__.py +++ b/src/netius/servers/__init__.py @@ -52,7 +52,7 @@ from .dhcp import DHCPRequest, DHCPServer from .echo_ws import EchoWSServer -from .echo import EchoServer +from .echo import EchoProtocol, EchoServer from .ftp import FTPConnection, FTPServer from .http import HTTPConnection, HTTPServer from .http2 import HTTP2Server diff --git a/src/netius/servers/echo.py b/src/netius/servers/echo.py index 56ddbf12..f82f5f14 100644 --- a/src/netius/servers/echo.py +++ b/src/netius/servers/echo.py @@ -40,14 +40,41 @@ import netius class EchoProtocol(netius.StreamProtocol): - pass + + def on_data(self, data): + netius.StreamProtocol.on_data(self, data) + self.send(data) + + def serve( + self, + host = "127.0.0.1", + port = 8888, + ssl = False, + env = False, + loop = None + ): + loop = netius.serve_stream( + lambda: self, + host = host, + port = port, + ssl = ssl, + loop = loop, + env = env + ) + return loop, self class EchoServer(netius.ServerAgent): protocol = EchoProtocol + @classmethod + def serve_s(cls, **kwargs): + protocol = cls.protocol() + return protocol.serve(**kwargs) + if __name__ == "__main__": - server = EchoServer() - server.serve(env = True) + loop, _protocol = EchoServer.serve_s() + loop.run_forever() + loop.close() else: __path__ = [] diff --git a/src/netius/servers/proxy.py b/src/netius/servers/proxy.py index 8085d49f..de55def3 100644 --- a/src/netius/servers/proxy.py +++ b/src/netius/servers/proxy.py @@ -483,21 +483,23 @@ def _on_prx_close(self, client, _connection): # in case the connection is under the waiting state # the forbidden response is set to the client otherwise # the front-end connection is closed immediately - if _connection.waiting: connection.send_response( - data = cls.build_data( - "Forbidden", - url = _connection.error_url if\ - hasattr(_connection, "error_url") else None - ), - headers = dict( - connection = "close" - ), - code = 403, - code_s = "Forbidden", - apply = True, - callback = self._prx_close - ) - else: connection.close(flush = True) + if _connection.waiting: + connection.send_response( + data = cls.build_data( + "Forbidden", + url = _connection.error_url if\ + hasattr(_connection, "error_url") else None + ), + headers = dict( + connection = "close" + ), + code = 403, + code_s = "Forbidden", + apply = True, + callback = self._prx_close + ) + else: + connection.close(flush = True) # removes the waiting state from the connection and # the removes the back-end to front-end connection diff --git a/src/netius/servers/runners/__init__.py b/src/netius/servers/runners/__init__.py new file mode 100644 index 00000000..52c2dd14 --- /dev/null +++ b/src/netius/servers/runners/__init__.py @@ -0,0 +1,35 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Hive Netius System +# Copyright (c) 2008-2020 Hive Solutions Lda. +# +# This file is part of Hive Netius System. +# +# Hive Netius System is free software: you can redistribute it and/or modify +# it under the terms of the Apache License as published by the Apache +# Foundation, either version 2.0 of the License, or (at your option) any +# later version. +# +# Hive Netius System is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Apache License for more details. +# +# You should have received a copy of the Apache License along with +# Hive Netius System. If not, see . + +__version__ = "1.0.0" +""" The version of the module """ + +__revision__ = "$LastChangedRevision$" +""" The revision number of the module """ + +__date__ = "$LastChangedDate$" +""" The last change date of the module """ + +__copyright__ = "Copyright (c) 2008-2020 Hive Solutions Lda." +""" The copyright for the module """ + +__license__ = "Apache License, Version 2.0" +""" The license for the module """ diff --git a/src/netius/servers/runners/echo.py b/src/netius/servers/runners/echo.py new file mode 100644 index 00000000..ffb3fe9f --- /dev/null +++ b/src/netius/servers/runners/echo.py @@ -0,0 +1,70 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Hive Netius System +# Copyright (c) 2008-2020 Hive Solutions Lda. +# +# This file is part of Hive Netius System. +# +# Hive Netius System is free software: you can redistribute it and/or modify +# it under the terms of the Apache License as published by the Apache +# Foundation, either version 2.0 of the License, or (at your option) any +# later version. +# +# Hive Netius System is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Apache License for more details. +# +# You should have received a copy of the Apache License along with +# Hive Netius System. If not, see . + +__author__ = "João Magalhães " +""" The author(s) of the module """ + +__version__ = "1.0.0" +""" The version of the module """ + +__revision__ = "$LastChangedRevision$" +""" The revision number of the module """ + +__date__ = "$LastChangedDate$" +""" The last change date of the module """ + +__copyright__ = "Copyright (c) 2008-2020 Hive Solutions Lda." +""" The copyright for the module """ + +__license__ = "Apache License, Version 2.0" +""" The license for the module """ + +import os + +import netius + +from netius.servers import EchoProtocol, EchoServer + +async def main_asyncio(): + # retrieves a reference to the event loop as we plan to use + # low-level APIs, this should return the default event loop + import asyncio + loop = asyncio.get_running_loop() + server = await loop.create_server(lambda: EchoProtocol(), "127.0.0.1", 8888) + async with server: + await server.serve_forever() + +def run_native(): + loop, _protocol = EchoServer.serve_s(host = "127.0.0.1", port = 8888) + loop.run_forever() + loop.close() + +def run_asyncio(): + netius.run(main_asyncio()) + +if __name__ == "__main__": + if os.environ.get("ASYNCIO", "0") == "1" or\ + os.environ.get("COMPAT", "0") == "1": + run_asyncio() + else: + run_native() +else: + __path__ = [] diff --git a/src/netius/test/base/asynchronous.py b/src/netius/test/base/asynchronous.py index 2bb9108b..4817f976 100644 --- a/src/netius/test/base/asynchronous.py +++ b/src/netius/test/base/asynchronous.py @@ -86,3 +86,23 @@ def test_sleep(self): self.assertEqual(timeout, 1.0) self.assertEqual(isinstance(future, netius.Future), True) self.assertEqual(future.done(), True) + +class FutureTest(unittest.TestCase): + + def test_is_future(self): + future = netius.Future() + + result = netius.is_future(future) + self.assertEqual(result, True) + + def test_is_future_native(self): + try: import asyncio + except: asyncio = None + + if not asyncio or not hasattr(asyncio, "isfuture"): + self.skipTest("No asyncio or asyncio.isfuture() available") + + future = netius.Future() + + result = asyncio.isfuture(future) + self.assertEqual(result, True)