Skip to content

Commit

Permalink
fix IPC server graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
briangu committed Oct 14, 2023
1 parent a11f153 commit 95c5b11
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 17 deletions.
30 changes: 17 additions & 13 deletions klongpy/sys_fn_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,10 +608,11 @@ def __str__(self):


class TcpServerConnectionHandler:
def __init__(self, ioloop, klongloop, klong):
def __init__(self, ioloop, klongloop, klong, shutdown_event=None):
self.ioloop = ioloop
self.klong = klong
self.klongloop = klongloop
self.shutdown_event = shutdown_event

async def _on_connect(self, nc):
logging.info(f"New connection from {str(nc.conn_provider)}")
Expand Down Expand Up @@ -654,7 +655,7 @@ async def handle_client(self, reader: StreamReader, writer: StreamWriter):
if host == "::1":
host = "localhost"
conn_provider = ReaderWriterConnectionProvider(reader, writer, host, port)
nc = NetworkClient.create_from_conn_provider(self.ioloop, self.klongloop, self.klong, conn_provider, on_connect=self._on_connect, on_close=self._on_close, on_error=self._on_error)
nc = NetworkClient.create_from_conn_provider(self.ioloop, self.klongloop, self.klong, conn_provider, shutdown_event=self.shutdown_event, on_connect=self._on_connect, on_close=self._on_close, on_error=self._on_error)
try:
await nc.run_server()
finally:
Expand All @@ -668,10 +669,10 @@ def __init__(self):
self.server = None
self.connections = []

def create_server(self, ioloop, klongloop, klong, bind, port):
def create_server(self, ioloop, klongloop, klong, bind, port, shutdown_event=None):
if self.task is not None:
return 0
self.connection_handler = TcpServerConnectionHandler(ioloop, klongloop, klong)
self.connection_handler = TcpServerConnectionHandler(ioloop, klongloop, klong, shutdown_event=shutdown_event)
self.task = ioloop.call_soon_threadsafe(asyncio.create_task, self.run_server(bind, port))
return 1

Expand All @@ -682,7 +683,6 @@ def shutdown_server(self):
if not writer.is_closing():
writer.close()
self.connections.clear()

self.server.close()
self.server = None
self.task.cancel()
Expand All @@ -701,13 +701,9 @@ async def handle_client(self, reader, writer):
self.connections.remove(writer)

async def run_server(self, bind, port):
self.server = await asyncio.start_server(self.handle_client, bind, port, reuse_address=True)

self.server = await asyncio.start_server(self.handle_client, bind, port, reuse_address=True, start_serving=True)
addr = self.server.sockets[0].getsockname()
logging.info(f'Serving on {addr}')

async with self.server:
await self.server.serve_forever()
logging.info(f'Serving IPC on {addr}')

class NetworkClientDictHandle(dict):
def __init__(self, nc: NetworkClient):
Expand Down Expand Up @@ -920,12 +916,20 @@ def eval_sys_fn_create_ipc_server(klong, x):
parts = x.split(":")
bind = parts[0] if len(parts) > 1 else None
port = int(parts[0] if len(parts) == 1 else parts[1])
system = klong['.system']
shutdown_event = system['closeEvent']
if len(parts) == 1 and port == 0:
shutdown_event.unsubscribe(_ipc_tcp_server.shutdown_server)
return _ipc_tcp_server.shutdown_server()
system = klong['.system']
ioloop = system['ioloop']
klongloop = system['klongloop']
return _ipc_tcp_server.create_server(ioloop, klongloop, klong, bind, port)
# subscribe to the shutdown event and run shutdown_server in the klong loop
async def async_shutdown_in_klongloop():
_ipc_tcp_server.shutdown_server()
def shutdown_in_klongloop():
klongloop.call_soon_threadsafe(asyncio.create_task, async_shutdown_in_klongloop())
shutdown_event.subscribe(shutdown_in_klongloop)
return _ipc_tcp_server.create_server(ioloop, klongloop, klong, bind, port, shutdown_event=shutdown_event)


class KGAsyncCall(KGLambda):
Expand Down
6 changes: 3 additions & 3 deletions klongpy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ def __iter__(self):

class CallbackEvent:
def __init__(self):
self.subscribers = []
self.subscribers = set()

def subscribe(self, callback):
self.subscribers.append(callback)
self.subscribers.add(callback)

def unsubscribe(self, callback):
try:
self.subscribers.remove(callback)
self.subscribers.discard(callback)
except ValueError:
# Callback was not found in the list of subscribers
pass
Expand Down
2 changes: 1 addition & 1 deletion scripts/kgpy
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ if __name__ == "__main__":
run_file(klong_loop, klong, args.filename, verbose=args.verbose)
# block while there are running io tasks (e.g. IPC server)
# is there a better way to do this?
while len(asyncio.all_tasks(loop=io_loop)) > 1:
while len(asyncio.all_tasks(loop=io_loop)) > 0:
time.sleep(0.1)
else:
run_repl = True
Expand Down

0 comments on commit 95c5b11

Please sign in to comment.