diff --git a/CHANGELOG/unreleased/kong/11480.yaml b/CHANGELOG/unreleased/kong/11480.yaml new file mode 100644 index 00000000000..96f39635558 --- /dev/null +++ b/CHANGELOG/unreleased/kong/11480.yaml @@ -0,0 +1,7 @@ +message: Fix a problem that abnormal socket connection will be reused when querying Postgres database. +type: bugfix +scope: Core +prs: + - 11480 +jiras: + - "FTI-5322" diff --git a/kong/db/strategies/postgres/connector.lua b/kong/db/strategies/postgres/connector.lua index 8bef29b3ea4..342608fd8a7 100644 --- a/kong/db/strategies/postgres/connector.lua +++ b/kong/db/strategies/postgres/connector.lua @@ -502,6 +502,7 @@ function _mt:query(sql, operation) operation = "write" end + local conn, is_new_conn local res, err, partial, num_queries local ok @@ -510,23 +511,36 @@ function _mt:query(sql, operation) return nil, "error acquiring query semaphore: " .. err end - local conn = self:get_stored_connection(operation) - if conn then - res, err, partial, num_queries = conn:query(sql) - - else - local connection + conn = self:get_stored_connection(operation) + if not conn then local config = operation == "write" and self.config or self.config_ro - connection, err = connect(config) - if not connection then + conn, err = connect(config) + if not conn then self:release_query_semaphore_resource(operation) return nil, err end + is_new_conn = true + end + + res, err, partial, num_queries = conn:query(sql) - res, err, partial, num_queries = connection:query(sql) + -- if err is string then either it is a SQL error + -- or it is a socket error, here we abort connections + -- that encounter errors instead of reusing them, for + -- safety reason + if err and type(err) == "string" then + ngx.log(ngx.DEBUG, "SQL query throw error: ", err, ", close connection") + local _, err = conn:disconnect() + if err then + -- We're at the end of the query - just logging if + -- we cannot cleanup the connection + ngx.log(ngx.ERR, "failed to disconnect: ", err) + end + self.store_connection(nil, operation) - setkeepalive(connection) + elseif is_new_conn then + setkeepalive(conn) end self:release_query_semaphore_resource(operation) diff --git a/spec/02-integration/03-db/01-db_spec.lua b/spec/02-integration/03-db/01-db_spec.lua index 10af4723125..e28c394a429 100644 --- a/spec/02-integration/03-db/01-db_spec.lua +++ b/spec/02-integration/03-db/01-db_spec.lua @@ -450,6 +450,50 @@ for _, strategy in helpers.each_strategy() do end) end) + describe("#testme :query() [#" .. strategy .. "]", function() + lazy_setup(function() + helpers.get_db_utils(strategy, {}) + end) + + postgres_only("establish new connection when error occurred", function() + ngx.IS_CLI = false + + local conf = utils.deep_copy(helpers.test_conf) + conf.pg_ro_host = conf.pg_host + conf.pg_ro_user = conf.pg_user + + local db, err = DB.new(conf, strategy) + + assert.is_nil(err) + assert.is_table(db) + assert(db:init_connector()) + assert(db:connect()) + + local res, err = db.connector:query("SELECT now();") + assert.not_nil(res) + assert.is_nil(err) + + local old_conn = db.connector:get_stored_connection("write") + assert.not_nil(old_conn) + + local res, err = db.connector:query("SELECT * FROM not_exist_table;") + assert.is_nil(res) + assert.not_nil(err) + + local new_conn = db.connector:get_stored_connection("write") + assert.is_nil(new_conn) + + local res, err = db.connector:query("SELECT now();") + assert.not_nil(res) + assert.is_nil(err) + + local res, err = db.connector:query("SELECT now();") + assert.not_nil(res) + assert.is_nil(err) + + assert(db:close()) + end) + end) describe(":setkeepalive() [#" .. strategy .. "]", function() lazy_setup(function()