Skip to content

Commit

Permalink
[postgres] share db engine with watcher_thread (#7143)
Browse files Browse the repository at this point in the history
Re-use the pool settings we "upgrade" the instance to for dagit in the pg event watcher thread instead of connect on each event. For some usage patterns this repeated connect cost is very substantial. 

## Test Plan

existing test suite
  • Loading branch information
alangenfeld committed Mar 22, 2022
1 parent bfb8978 commit f426230
Showing 1 changed file with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@ def __init__(self, postgres_url, should_autocreate_tables=True, inst_data=None):

self._disposed = False

self._event_watcher = PostgresEventWatcher(self.postgres_url)

# Default to not holding any connections open to prevent accumulating connections per DagsterInstance
self._engine = create_engine(
self.postgres_url, isolation_level="AUTOCOMMIT", poolclass=db.pool.NullPool
)

# lazy init
self._event_watcher: Optional[PostgresEventWatcher] = None

self._secondary_index_cache = {}

table_names = retry_pg_connection_fn(lambda: db.inspect(self._engine).get_table_names())
Expand Down Expand Up @@ -257,9 +259,15 @@ def enable_secondary_index(self, name):
del self._secondary_index_cache[name]

def watch(self, run_id, start_cursor, callback):
if self._event_watcher is None:
self._event_watcher = PostgresEventWatcher(self.postgres_url, self._engine)

self._event_watcher.watch_run(run_id, start_cursor, callback)

def end_watch(self, run_id, handler):
if self._event_watcher is None:
return

self._event_watcher.unwatch_run(run_id, handler)

def __del__(self):
Expand All @@ -269,14 +277,16 @@ def __del__(self):
def dispose(self):
if not self._disposed:
self._disposed = True
self._event_watcher.close()
if self._event_watcher:
self._event_watcher.close()


POLLING_CADENCE = 0.25


def watcher_thread(
conn_string: str,
engine: db.engine.Engine,
handlers_dict: MutableMapping[str, List[CallbackAfterCursor]],
dict_lock: threading.Lock,
watcher_thread_exit: threading.Event,
Expand All @@ -303,9 +313,6 @@ def watcher_thread(
with dict_lock:
handlers = handlers_dict.get(run_id, [])

engine = create_engine(
conn_string, isolation_level="AUTOCOMMIT", poolclass=db.pool.NullPool
)
try:
with engine.connect() as conn:
cursor_res = conn.execute(
Expand All @@ -330,8 +337,9 @@ def watcher_thread(


class PostgresEventWatcher:
def __init__(self, conn_string: str):
def __init__(self, conn_string: str, engine: db.engine.Engine):
self._conn_string: str = check.str_param(conn_string, "conn_string")
self._engine = engine
self._handlers_dict: MutableMapping[str, List[CallbackAfterCursor]] = defaultdict(list)
self._dict_lock: threading.Lock = threading.Lock()
self._watcher_thread_exit: Optional[threading.Event] = None
Expand All @@ -356,6 +364,7 @@ def watch_run(
target=watcher_thread,
args=(
self._conn_string,
self._engine,
self._handlers_dict,
self._dict_lock,
self._watcher_thread_exit,
Expand Down

0 comments on commit f426230

Please sign in to comment.