Skip to content

Commit

Permalink
Add thread pool for TCPServer
Browse files Browse the repository at this point in the history
  • Loading branch information
wingel committed Aug 27, 2019
1 parent 3c2756a commit deb0216
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 3 deletions.
42 changes: 39 additions & 3 deletions ntske-server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import traceback
from socketserver import ThreadingTCPServer, TCPServer, BaseRequestHandler

from pooling import ThreadPoolTCPServer
from sslwrapper import SSLWrapper
from constants import *
from ntske_record import *
Expand Down Expand Up @@ -42,7 +43,7 @@ def pack_array(a):

class NTSKEHandler(BaseRequestHandler):
def handle(self):
print("Handle", self.client_address)
print("Handle", self.client_address, "in child", os.getpid())

self.keyid, self.key = self.server.helper.get_master_key()
s = self.server.wrapper.accept(self.request)
Expand Down Expand Up @@ -234,7 +235,10 @@ def get_response(self, c2s_key, s2c_key):

return records

class NTSKEServer(ThreadingTCPServer):
ChosenTCPServer = ThreadingTCPServer
ChosenTCPServer = ThreadPoolTCPServer

class NTSKEServer(ChosenTCPServer):
allow_reuse_address = True

def __init__(self, config_path):
Expand Down Expand Up @@ -266,7 +270,39 @@ def main():
config_path = sys.argv[1]

server = NTSKEServer(config_path)
server.serve_forever()

pids = []

if 1:
for i in range(3):
pid = os.fork()
if pid == 0:
print("child process", os.getpid())
try:
try:
server.serve_forever()
except KeyboardInterrupt:
print("keyboardinterrupt in child", os.getpid(), "...")
pass
print("child", os.getpid(), "stopping...")
server.server_close()
finally:
sys.exit(0)
else:
pids.append(pid)

try:
server.serve_forever()
except KeyboardInterrupt:
print("keyboardinterrupt")

print("shutting down...")

server.server_close()

for pid in pids:
p, status = os.wait()
print("child", p, "has stopped")

if __name__ == "__main__":
main()
61 changes: 61 additions & 0 deletions pooling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#! /usr/bin/python
from __future__ import division, print_function, unicode_literals

from concurrent.futures import ThreadPoolExecutor
import socketserver
import queue

class BoundedThreadPoolExecutor(ThreadPoolExecutor):
"""A variant of the ThreadPoolExecutor with a bounded queue size"""

def __init__(self, max_queue_size = 0, *args, **kwargs):
super(BoundedThreadPoolExecutor, self).__init__(
*args, **kwargs)
self._work_queue = queue.Queue(maxsize = max_queue_size)

class ThreadPoolMixIn:
"""Mix-in class to handle each request using a thread pool."""

# Maximum number of workers
max_workers = 100

# Maximum number of jobs queued for workers
max_queue_size = 1000

# Executor
_executor = None

def process_request_thread(self, request, client_address):
"""Same as in BaseServer but executed from a thread.
In addition, exception handling is done here.
"""

try:
self.finish_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
finally:
self.shutdown_request(request)

def process_request(self, request, client_address):
"""Process the request. Queue up work for a thread."""

if self._executor is None:
self._executor = BoundedThreadPoolExecutor(
max_workers = self.max_workers,
max_queue_size = self.max_queue_size)

self._executor.submit(self.process_request_thread,
request, client_address)

import os
print("child", os.getpid(), "queue size is", self._executor._work_queue.qsize())

def server_close(self):
super().server_close()
if self._executor is not None:
self._executor.close()

class ThreadPoolTCPServer(ThreadPoolMixIn, socketserver.TCPServer):
pass

0 comments on commit deb0216

Please sign in to comment.