Skip to content

Commit

Permalink
feat: Set default SQL statement timeouts (backport #18771) (#18800)
Browse files Browse the repository at this point in the history
Co-authored-by: Ankush Menat <ankush@frappe.io>
  • Loading branch information
mergify[bot] and ankush committed Nov 9, 2022
1 parent cd7bafa commit 127763a
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 1 deletion.
38 changes: 37 additions & 1 deletion frappe/database/database.py
Expand Up @@ -8,6 +8,7 @@

import datetime
import re
from contextlib import suppress
from time import time
from typing import Dict, List, Union

Expand All @@ -21,7 +22,7 @@
from frappe.database.query import Query
from frappe.model.utils.link_count import flush_local_link_count
from frappe.query_builder.utils import DocType
from frappe.utils import cast, get_datetime, get_table_name, getdate, now, sbool
from frappe.utils import cast, cint, get_datetime, get_table_name, getdate, now, sbool


class Database(object):
Expand Down Expand Up @@ -85,6 +86,18 @@ def connect(self):
self._cursor = self._conn.cursor()
frappe.local.rollback_observers = []

try:
execution_timeout = get_query_execution_timeout()
if execution_timeout:
self.set_execution_timeout(execution_timeout)
except Exception as e:
frappe.logger("database").warning(f"Couldn't set execution timeout {e}")

def set_execution_timeout(self, seconds: int):
"""Set session speicifc timeout on exeuction of statements.
If any statement takes more time it will be killed along with entire transaction."""
raise NotImplementedError

def use(self, db_name):
"""`USE` db_name."""
self._conn.select_db(db_name)
Expand Down Expand Up @@ -1273,3 +1286,26 @@ def enqueue_jobs_after_commit():
result_ttl=RQ_RESULTS_TTL,
)
frappe.flags.enqueue_after_commit = []


def get_query_execution_timeout() -> int:
"""Get execution timeout based on current timeout in different contexts.
HTTP requests: HTTP timeout or a default (300)
Background jobs: Job timeout
Console/Commands: No timeout = 0.
Note: Timeout adds 1.5x as "safety factor"
"""
from rq import get_current_job

if not frappe.conf.get("enable_db_statement_timeout"):
return 0

# Zero means no timeout, which is the default value in db.
timeout = 0
with suppress(Exception):
if getattr(frappe.local, "request", None):
timeout = frappe.conf.http_timeout or 300
elif get_current_job():
timeout = get_current_job().timeout

return int(cint(timeout) * 1.5)
7 changes: 7 additions & 0 deletions frappe/database/mariadb/database.py
Expand Up @@ -93,6 +93,9 @@ def get_connection(self):

return conn

def set_execution_timeout(self, seconds: int):
self.sql("set session max_statement_time = %s", int(seconds))

def get_database_size(self):
"""'Returns database size in MB"""
db_size = self.sql(
Expand Down Expand Up @@ -154,6 +157,10 @@ def is_deadlocked(e):
def is_timedout(e):
return e.args[0] == ER.LOCK_WAIT_TIMEOUT

@staticmethod
def is_statement_timeout(e):
return e.args[0] == 1969

@staticmethod
def is_table_missing(e):
return e.args[0] == ER.NO_SUCH_TABLE
Expand Down
8 changes: 8 additions & 0 deletions frappe/database/postgres/database.py
Expand Up @@ -78,6 +78,10 @@ def get_connection(self):

return conn

def set_execution_timeout(self, seconds: int):
# Postgres expects milliseconds as input
self.sql("set local statement_timeout = %s", int(seconds) * 1000)

def escape(self, s, percent=True):
"""Excape quotes and percent in given string."""
if isinstance(s, bytes):
Expand Down Expand Up @@ -157,6 +161,10 @@ def is_timedout(e):
# http://initd.org/psycopg/docs/extensions.html?highlight=datatype#psycopg2.extensions.QueryCanceledError
return isinstance(e, psycopg2.extensions.QueryCanceledError)

@staticmethod
def is_statement_timeout(e):
return PostgresDatabase.is_timedout(e) or isinstance(e, frappe.QueryTimeoutError)

@staticmethod
def is_table_missing(e):
return getattr(e, "pgcode", None) == "42P01"
Expand Down
13 changes: 13 additions & 0 deletions frappe/tests/test_db.py
Expand Up @@ -95,6 +95,19 @@ def test_get_single_value(self):
# teardown
clear_custom_fields("Print Settings")

@run_only_if(db_type_is.MARIADB)
def test_db_statement_execution_timeout(self):
frappe.db.set_execution_timeout(2)
# Setting 0 means no timeout.
self.addCleanup(frappe.db.set_execution_timeout, 0)

try:
frappe.db.sql("select sleep(10)")
except Exception as e:
self.assertTrue(frappe.db.is_statement_timeout(e), f"exepcted {e} to be timeout error")
else:
self.fail("Long running queries not timing out")

def test_log_touched_tables(self):
frappe.flags.in_migrate = True
frappe.flags.touched_tables = set()
Expand Down

0 comments on commit 127763a

Please sign in to comment.