Skip to content

Commit

Permalink
Added libs/threading_udp_server.my.
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrien Delle Cave committed Oct 19, 2022
1 parent ff5edfc commit 19a5be9
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 5 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG
@@ -1,3 +1,9 @@
python-sonicprobe (0.3.49) unstable; urgency=medium

* Added libs/threading_udp_server.my.

-- Adrien DELLE CAVE (Decryptus) <adc@doowan.net> Wed, 19 Oct 2022 10:31:20 +0200

python-sonicprobe (0.3.48) unstable; urgency=medium

* Fixed password_check in sonicprobe.helpers.
Expand Down
2 changes: 1 addition & 1 deletion RELEASE
@@ -1 +1 @@
0.3.48
0.3.49
2 changes: 1 addition & 1 deletion VERSION
@@ -1 +1 @@
0.3.48
0.3.49
2 changes: 1 addition & 1 deletion requirements.txt
@@ -1,4 +1,4 @@
httpdis>=0.6.25
httpdis>=0.6.26
pyOpenSSL
python-magic
psutil>=2.1
Expand Down
4 changes: 2 additions & 2 deletions setup.yml
Expand Up @@ -4,8 +4,8 @@ description: sonicprobe
author: Adrien Delle Cave
author_email: pypi@doowan.net
copyright: '2022 Adrien Delle Cave'
release: '0.3.48'
version: '0.3.48'
release: '0.3.49'
version: '0.3.49'
license: License GPL-3
url: https://github.com/decryptus/sonicprobe
python_requires:
Expand Down
181 changes: 181 additions & 0 deletions sonicprobe/libs/threading_udp_server.py
@@ -0,0 +1,181 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015-2019 Adrien Delle Cave
# SPDX-License-Identifier: GPL-3.0-or-later
"""sonicprobe.libs.threading_udp_server"""

import logging
import threading
import time

from six.moves import queue, socketserver
from sonicprobe.libs.workerpool import WorkerPool

LOG = logging.getLogger('sonicprobe.threading-udp-server')


class ThreadingHTTPServer(socketserver.ThreadingUDPServer):
"""
Same as HTTPServer, but derives from ThreadingUDPServer instead of
UDPServer so that each http handler instance runs in its own thread.
"""

allow_reuse_address = 1 # Seems to make sense in testing environment

def server_bind(self):
"""Override server_bind to store the server name."""
socketserver.UDPServer.server_bind(self)
host, port = self.socket.getsockname()[:2]
self.server_name = socketserver.socket.getfqdn(host)
self.server_port = port


class KillableDynThreadingUDPServer(socketserver.ThreadingUDPServer):
_killed = False
allow_reuse_address = 1 # Seems to make sense in testing environment

def __init__(self, config, server_address, RequestHandlerClass, bind_and_activate = True, name = None):
socketserver.ThreadingUDPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate)

max_workers = int(config.get('max_workers', 0))
max_requests = int(config.get('max_requests'))
max_life_time = int(config.get('max_life_time'))

if max_workers < 1:
max_workers = 1

self.requests = queue.Queue()
self.workerpool = WorkerPool(name = name,
max_workers = max_workers,
max_tasks = max_requests,
life_time = max_life_time)

def kill(self):
self._killed = True
self.workerpool.killall(0)
return self._killed

def killed(self):
return self._killed

def handle_request(self):
"""simply collect requests and put them on the queue for the workers."""
try:
request, client_address = self.get_request()
except socketserver.socket.error:
return

if self.verify_request(request, client_address):
self.workerpool.run(self.process_request_thread,
**{'request': request,
'client_address': client_address})

def handle_error(self, request, client_address):
LOG.debug("Exception happened during processing of request from: %r", client_address)
LOG.debug("", exc_info = 1)

def serve_until_killed(self):
"""Handle one request at a time until we are murdered."""
while not self.killed():
self.handle_request()


class KillableDynThreadingHTTPServer(KillableDynThreadingUDPServer, ThreadingHTTPServer):
def server_bind(self):
ThreadingHTTPServer.server_bind(self)


class KillableThreadingUDPServer(socketserver.ThreadingUDPServer):
_killed = False
allow_reuse_address = 1 # Seems to make sense in testing environment

def __init__(self, config, server_address, RequestHandlerClass, bind_and_activate = True, name = None):
socketserver.ThreadingUDPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate)

self.worker_name = name

self.max_workers = int(config.get('max_workers', 0))
self.max_requests = int(config.get('max_requests', 0))
self.max_life_time = int(config.get('max_life_time', 0))

if self.max_workers < 1:
self.max_workers = 1

self.requests = queue.Queue(self.max_workers)

self.add_worker(self.max_workers)

def kill(self):
self._killed = True
return self._killed

def killed(self):
return self._killed

def add_worker(self, nb = 1, name = None):
tname = name or self.worker_name or "Thread"

for n in range(nb): # pylint: disable=unused-variable
t = threading.Thread(target = self.process_request_thread,
args = (self,))
t.setName(threading._newname("%s:%%d" % tname)) # pylint: disable=protected-access
t.daemon = True
t.start()

def process_request_thread(self, mainthread): # pylint: disable=arguments-differ
"""obtain request from queue instead of directly from server socket"""
life_time = time.time()
nb_requests = 0

while not mainthread.killed():
if self.max_life_time > 0:
if (time.time() - life_time) >= self.max_life_time:
mainthread.add_worker(1)
return
try:
try:
socketserver.ThreadingUDPServer.process_request_thread(self, *self.requests.get(True, 0.5))
except queue.Empty:
continue
except AttributeError:
return
else:
socketserver.ThreadingUDPServer.process_request_thread(self, *self.requests.get())

LOG.debug("nb_requests: %d, max_requests: %d", nb_requests, self.max_requests)
nb_requests += 1

if self.max_requests > 0 and nb_requests >= self.max_requests:
mainthread.add_worker(1)
return

def handle_request(self):
"""simply collect requests and put them on the queue for the workers."""
try:
request, client_address = self.get_request()
except socketserver.socket.error:
return

if self.verify_request(request, client_address):
self.requests.put((request, client_address))

def handle_error(self, request, client_address):
LOG.debug("Exception happened during processing of request from: %r", client_address)
LOG.debug("", exc_info = 1)

def serve_until_killed(self):
"""Handle one request at a time until we are murdered."""
while not self.killed():
self.handle_request()


class KillableThreadingHTTPServer(KillableThreadingUDPServer, ThreadingHTTPServer):
def server_bind(self):
ThreadingHTTPServer.server_bind(self)


__all__ = [
'ThreadingHTTPServer',
'KillableThreadingUDPServer',
'KillableThreadingHTTPServer',
'KillableDynThreadingUDPServer',
'KillableDynThreadingHTTPServer']

0 comments on commit 19a5be9

Please sign in to comment.