From f2e16dd81a84b05376f22525e3a8e9eb696acb2e Mon Sep 17 00:00:00 2001 From: Riccardo Murri Date: Tue, 10 Apr 2018 15:35:30 +0200 Subject: [PATCH] Make SQL-based task store work nicely with forking. This fixes an issue in `SessionBasedDaemon`, where after forking the daemon would refuse to perform any operation on the DB with an error like "OperationalError: (...) could not receive data from server: Transport endpoint is not connected". --- gc3libs/cmdline.py | 3 +++ gc3libs/persistence/sql.py | 25 +++++++++++++++++++++++-- gc3libs/persistence/store.py | 26 ++++++++++++++++++++++++++ 3 files changed, 52 insertions(+), 2 deletions(-) diff --git a/gc3libs/cmdline.py b/gc3libs/cmdline.py index 2e5adcaf..9a4c798d 100755 --- a/gc3libs/cmdline.py +++ b/gc3libs/cmdline.py @@ -2495,6 +2495,7 @@ def _main(self): "Keep running in foreground" " as requested with `-F`/`--foreground` option ...") else: + self.session.store.pre_fork() # redirect all output logfile = open(os.path.join(self.params.working_dir, 'daemon.log'), 'w') os.dup2(logfile.fileno(), 1) @@ -2517,6 +2518,8 @@ def _main(self): # prematurely removed while the daemon is still # preparing... atexit.register(rm_f, lockfile_path) + # un-suspend session store functionality + self.session.store.post_fork() self._start_inboxes() self._start_server() self.running = True diff --git a/gc3libs/persistence/sql.py b/gc3libs/persistence/sql.py index 8730a36d..45f69bfd 100755 --- a/gc3libs/persistence/sql.py +++ b/gc3libs/persistence/sql.py @@ -212,8 +212,10 @@ def _delayed_init(self): See `GC3Pie issue #550 `_ for more details and motivation. """ - self._real_engine = sqla.create_engine( - self._to_sqlalchemy_url(self.url)) + url = self._to_sqlalchemy_url(self.url) + gc3libs.log.debug( + "Initializing SQLAlchemy engine for `%s`...", url) + self._real_engine = sqla.create_engine(url) # create schema meta = sqla.MetaData(bind=self._real_engine) @@ -244,6 +246,25 @@ def _delayed_init(self): self._real_tables = meta.tables[self.table_name] + def pre_fork(self): + """ + Dispose current SQLAlchemy engine (if any). + A new SQLAlchemy engine will be initialized + upon the next interaction with a DB. + + This method only exists to allow `SessionBasedDaemon`:class: + and similar applications that can do DB operations after + fork()ing to continue to operate, without incurring into a + SQLAlchemy "OperationalError: (...) could not receive data + from server: Transport endpoint is not connected" + """ + if self._real_engine: + self._real_engine.dispose() + self._real_engine = None + self._real_extra_fields = None + self._real_tables = None + + @property def _engine(self): if self._real_engine is None: diff --git a/gc3libs/persistence/store.py b/gc3libs/persistence/store.py index 600ee692..cd5df583 100755 --- a/gc3libs/persistence/store.py +++ b/gc3libs/persistence/store.py @@ -55,6 +55,32 @@ def __init__(self, url=None): url = Url(url) self.url = url + def pre_fork(self): + """ + Make preparations for `fork()`ing the current process. + + This should close open network connections or any other + sockets or file descriptors that cannot be used by both the + parent and child process. + + The default implementation of this method does nothing; as of + 2018-04-10, the only subclass making use of this functionality + is `SqlStore`:class:, which needs to dispose the SQLAlchemy + engine and re-create it after forking. + """ + pass + + def post_fork(self): + """ + Restore functionality that was suspended in `pre_fork`:meth: + + This method will be called after forking/daemonizing has been + successfully accomplished. + + The default implementation of this method does nothing. + """ + pass + def list(self, **extra_args): """ Return list of IDs of saved `Job` objects.