Skip to content

Commit eaa1141

Browse files
committed
Refactor server config
1 parent 745c073 commit eaa1141

File tree

9 files changed

+157
-1467
lines changed

9 files changed

+157
-1467
lines changed

plain/plain/server/app.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,12 @@ def do_load_config(self) -> None:
3434
Loads the configuration
3535
"""
3636
try:
37-
self.load_default_config()
3837
self.load_config()
3938
except Exception as e:
4039
print(f"\nError: {str(e)}", file=sys.stderr)
4140
sys.stderr.flush()
4241
sys.exit(1)
4342

44-
def load_default_config(self) -> None:
45-
# init configuration
46-
self.cfg = Config(self.usage, prog=self.prog)
47-
4843
def init(
4944
self, parser: argparse.ArgumentParser, opts: argparse.Namespace, args: list[str]
5045
) -> None:

plain/plain/server/arbiter.py

Lines changed: 53 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class Arbiter:
3535
"""
3636
Arbiter maintain the workers processes alive. It launches or
3737
kills them if needed. It also manages application reloading
38-
via SIGHUP/USR2.
38+
via SIGHUP.
3939
"""
4040

4141
# A flag indicating if a worker failed to
@@ -75,9 +75,6 @@ def __init__(self, app: BaseApplication):
7575

7676
self.pidfile: Pidfile | None = None
7777
self.worker_age: int = 0
78-
self.reexec_pid: int = 0
79-
self.master_pid: int = 0
80-
self.master_name: str = "Master"
8178

8279
cwd = util.getcwd()
8380

@@ -97,63 +94,34 @@ def _set_num_workers(self, value: int) -> None:
9794

9895
def setup(self, app: BaseApplication) -> None:
9996
self.app: BaseApplication = app
97+
assert app.cfg is not None, "Application config must be initialized"
10098
self.cfg: Config = app.cfg
10199

102100
if self.log is None:
103-
self.log = self.cfg.logger_class(app.cfg)
101+
from .glogging import Logger
104102

105-
# reopen files
106-
if "PLAIN_SERVER_PID" in os.environ:
107-
self.log.reopen_files()
103+
self.log = Logger(self.cfg)
108104

109105
self.worker_class: type[Worker] = self.cfg.worker_class
110106
self.address: str = self.cfg.address
111107
self.num_workers = self.cfg.workers
112108
self.timeout: int = self.cfg.timeout
113-
self.proc_name: str = self.cfg.proc_name
114-
115-
self.log.debug(
116-
"Current configuration:\n{}".format(
117-
"\n".join(
118-
f" {config}: {value.value}"
119-
for config, value in sorted(
120-
self.cfg.settings.items(), key=lambda setting: setting[1]
121-
)
122-
)
123-
)
124-
)
125109

126110
def start(self) -> None:
127111
"""\
128112
Initialize the arbiter. Start listening and set pidfile if needed.
129113
"""
130114
self.log.info("Starting plain server %s", plain.runtime.__version__)
131115

132-
if "PLAIN_SERVER_PID" in os.environ:
133-
self.master_pid = int(os.environ.get("PLAIN_SERVER_PID"))
134-
self.proc_name = self.proc_name + ".2"
135-
self.master_name = "Master.2"
136-
137116
self.pid: int = os.getpid()
138117
if self.cfg.pidfile is not None:
139-
pidname = self.cfg.pidfile
140-
if self.master_pid != 0:
141-
pidname += ".2"
142-
self.pidfile = Pidfile(pidname)
118+
self.pidfile = Pidfile(self.cfg.pidfile)
143119
self.pidfile.create(self.pid)
144120

145121
self.init_signals()
146122

147123
if not self.LISTENERS:
148-
fds = None
149-
150-
if self.master_pid:
151-
fds = []
152-
for fd in os.environ.pop("PLAIN_SERVER_FD").split(","):
153-
fds.append(int(fd))
154-
155-
if not (self.cfg.reuse_port and hasattr(socket, "SO_REUSEPORT")):
156-
self.LISTENERS = sock.create_sockets(self.cfg, self.log, fds)
124+
self.LISTENERS = sock.create_sockets(self.cfg, self.log)
157125

158126
listeners_str = ",".join([str(lnr) for lnr in self.LISTENERS])
159127
self.log.debug("Arbiter booted")
@@ -199,8 +167,6 @@ def run(self) -> None:
199167
self.manage_workers()
200168

201169
while True:
202-
self.maybe_promote_master()
203-
204170
sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
205171
if sig is None:
206172
self.sleep()
@@ -245,7 +211,7 @@ def handle_hup(self) -> None:
245211
- Start the new worker processes with a new configuration
246212
- Gracefully shutdown the old worker processes
247213
"""
248-
self.log.info("Hang up: %s", self.master_name)
214+
self.log.info("Hang up: Master")
249215
self.reload()
250216

251217
def handle_term(self) -> None:
@@ -289,35 +255,16 @@ def handle_usr1(self) -> None:
289255
self.kill_workers(signal.SIGUSR1)
290256

291257
def handle_usr2(self) -> None:
292-
"""\
293-
SIGUSR2 handling.
294-
Creates a new arbiter/worker set as a fork of the current
295-
arbiter without affecting old workers. Use this to do live
296-
deployment with the ability to backout a change.
297-
"""
298-
self.reexec()
258+
"""SIGUSR2 handling"""
259+
# USR2 for graceful restart is not supported
260+
self.log.debug("SIGUSR2 ignored")
299261

300262
def handle_winch(self) -> None:
301263
"""SIGWINCH handling"""
302264
# SIGWINCH is typically used to gracefully stop workers when running as daemon
303265
# Since we don't support daemon mode, just log that it's ignored
304266
self.log.debug("SIGWINCH ignored")
305267

306-
def maybe_promote_master(self) -> None:
307-
if self.master_pid == 0:
308-
return None
309-
310-
if self.master_pid != os.getppid():
311-
self.log.info("Master has been promoted.")
312-
# reset master infos
313-
self.master_name = "Master"
314-
self.master_pid = 0
315-
self.proc_name = self.cfg.proc_name
316-
del os.environ["PLAIN_SERVER_PID"]
317-
# rename the pidfile
318-
if self.pidfile is not None:
319-
self.pidfile.rename(self.cfg.pidfile)
320-
321268
def wakeup(self) -> None:
322269
"""\
323270
Wake up the arbiter by writing to the PIPE
@@ -333,7 +280,7 @@ def halt(self, reason: str | None = None, exit_status: int = 0) -> None:
333280
self.stop()
334281

335282
log_func = self.log.info if exit_status == 0 else self.log.error
336-
log_func("Shutting down: %s", self.master_name)
283+
log_func("Shutting down: Master")
337284
if reason is not None:
338285
log_func("Reason: %s", reason)
339286

@@ -367,8 +314,7 @@ def stop(self, graceful: bool = True) -> None:
367314
:attr graceful: boolean, If True (the default) workers will be
368315
killed gracefully (ie. trying to wait for the current connection)
369316
"""
370-
unlink = self.reexec_pid == self.master_pid == 0 and not self.cfg.reuse_port
371-
sock.close_sockets(self.LISTENERS, unlink)
317+
sock.close_sockets(self.LISTENERS, unlink=True)
372318

373319
self.LISTENERS = []
374320
sig = signal.SIGTERM
@@ -383,34 +329,6 @@ def stop(self, graceful: bool = True) -> None:
383329

384330
self.kill_workers(signal.SIGKILL)
385331

386-
def reexec(self) -> None:
387-
"""\
388-
Relaunch the master and workers.
389-
"""
390-
if self.reexec_pid != 0:
391-
self.log.warning("USR2 signal ignored. Child exists.")
392-
return None
393-
394-
if self.master_pid != 0:
395-
self.log.warning("USR2 signal ignored. Parent exists.")
396-
return None
397-
398-
master_pid = os.getpid()
399-
self.reexec_pid = os.fork()
400-
if self.reexec_pid != 0:
401-
return None
402-
403-
environ = self.cfg.env_orig.copy()
404-
environ["PLAIN_SERVER_PID"] = str(master_pid)
405-
environ["PLAIN_SERVER_FD"] = ",".join(
406-
str(lnr.fileno()) for lnr in self.LISTENERS
407-
)
408-
409-
os.chdir(self.START_CTX["cwd"])
410-
411-
# exec the process using the original environment
412-
os.execvpe(self.START_CTX[0], self.START_CTX["args"], environ)
413-
414332
def reload(self) -> None:
415333
old_address = self.cfg.address
416334

@@ -477,49 +395,47 @@ def reap_workers(self) -> None:
477395
wpid, status = os.waitpid(-1, os.WNOHANG)
478396
if not wpid:
479397
break
480-
if self.reexec_pid == wpid:
481-
self.reexec_pid = 0
482-
else:
483-
# A worker was terminated. If the termination reason was
484-
# that it could not boot, we'll shut it down to avoid
485-
# infinite start/stop cycles.
486-
exitcode = status >> 8
487-
if exitcode != 0:
488-
self.log.error(
489-
"Worker (pid:%s) exited with code %s", wpid, exitcode
490-
)
491-
if exitcode == self.WORKER_BOOT_ERROR:
492-
reason = "Worker failed to boot."
493-
raise HaltServer(reason, self.WORKER_BOOT_ERROR)
494-
if exitcode == self.APP_LOAD_ERROR:
495-
reason = "App failed to load."
496-
raise HaltServer(reason, self.APP_LOAD_ERROR)
497-
498-
if exitcode > 0:
499-
# If the exit code of the worker is greater than 0,
500-
# let the user know.
501-
self.log.error(
502-
"Worker (pid:%s) exited with code %s.", wpid, exitcode
503-
)
504-
elif status > 0:
505-
# If the exit code of the worker is 0 and the status
506-
# is greater than 0, then it was most likely killed
507-
# via a signal.
508-
try:
509-
sig_name = signal.Signals(status).name
510-
except ValueError:
511-
sig_name = f"code {status}"
512-
msg = f"Worker (pid:{wpid}) was sent {sig_name}!"
513-
514-
# Additional hint for SIGKILL
515-
if status == signal.SIGKILL:
516-
msg += " Perhaps out of memory?"
517-
self.log.error(msg)
518-
519-
worker = self.WORKERS.pop(wpid, None)
520-
if not worker:
521-
continue
522-
worker.tmp.close()
398+
399+
# A worker was terminated. If the termination reason was
400+
# that it could not boot, we'll shut it down to avoid
401+
# infinite start/stop cycles.
402+
exitcode = status >> 8
403+
if exitcode != 0:
404+
self.log.error(
405+
"Worker (pid:%s) exited with code %s", wpid, exitcode
406+
)
407+
if exitcode == self.WORKER_BOOT_ERROR:
408+
reason = "Worker failed to boot."
409+
raise HaltServer(reason, self.WORKER_BOOT_ERROR)
410+
if exitcode == self.APP_LOAD_ERROR:
411+
reason = "App failed to load."
412+
raise HaltServer(reason, self.APP_LOAD_ERROR)
413+
414+
if exitcode > 0:
415+
# If the exit code of the worker is greater than 0,
416+
# let the user know.
417+
self.log.error(
418+
"Worker (pid:%s) exited with code %s.", wpid, exitcode
419+
)
420+
elif status > 0:
421+
# If the exit code of the worker is 0 and the status
422+
# is greater than 0, then it was most likely killed
423+
# via a signal.
424+
try:
425+
sig_name = signal.Signals(status).name
426+
except ValueError:
427+
sig_name = f"code {status}"
428+
msg = f"Worker (pid:{wpid}) was sent {sig_name}!"
429+
430+
# Additional hint for SIGKILL
431+
if status == signal.SIGKILL:
432+
msg += " Perhaps out of memory?"
433+
self.log.error(msg)
434+
435+
worker = self.WORKERS.pop(wpid, None)
436+
if not worker:
437+
continue
438+
worker.tmp.close()
523439
except OSError as e:
524440
if e.errno != errno.ECHILD:
525441
raise
@@ -575,8 +491,6 @@ def spawn_worker(self) -> int:
575491
worker.pid = os.getpid()
576492
try:
577493
self.log.info("Booting worker with pid: %s", worker.pid)
578-
if self.cfg.reuse_port:
579-
worker.sockets = sock.create_sockets(self.cfg, self.log)
580494
worker.init_process()
581495
sys.exit(0)
582496
except SystemExit:

0 commit comments

Comments
 (0)