Skip to content

Commit

Permalink
feat(db): exposed postgres connection pooling config (#9603)
Browse files Browse the repository at this point in the history
* feat: exposed postgres connection pooling config

* added test cases

* updated kong.conf

* updated kong.conf

* removed invalid unit test

* addressed review comments

* addressed review comments 2

* added integration tests

* review comment fixes

* fixed flaky test

* removed thread wait in test case

* fixed lint error

* Update CHANGELOG.md

* moved the change to unreleased section in changelog

* Update CHANGELOG.md

Co-authored-by: Hans Hübner <hans.huebner@gmail.com>
Co-authored-by: Samuele Illuminati <samuele@konghq.com>
Co-authored-by: Wangchong Zhou <wangchong@konghq.com>
Co-authored-by: Mayo <mayocream39@yahoo.co.jp>
  • Loading branch information
5 people committed Dec 16, 2022
1 parent a648bdd commit f132418
Show file tree
Hide file tree
Showing 7 changed files with 356 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@

- Add back Postgres `FLOOR` function when calculating `ttl`, so the returned `ttl` is always a whole integer.
[#9960](https://github.com/Kong/kong/pull/9960)
- Expose postgres connection pool configuration
[#9603](https://github.com/Kong/kong/pull/9603)

#### Plugins

Expand Down
41 changes: 41 additions & 0 deletions kong.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,35 @@
# Detailed discussion of this behavior is
# available in the online documentation.

#pg_keepalive_timeout = # Specify the maximal idle timeout (in ms)
# for the postgres connections in the pool.
# If this value is set to 0 then the timeout interval
# is unlimited.
#
# If not specified this value will be same as
# `lua_socket_keepalive_timeout`

#pg_pool_size = # Specifies the size limit (in terms of connection
# count) for the Postgres server.
# Note that this connection pool is intended
# per Nginx worker rather than per Kong instance.
#
# If not specified, the default value is the same as
# `lua_socket_pool_size`

#pg_backlog = # If specified, this value will limit the total
# number of open connections to the Postgres
# server to `pg_pool_size`. If the connection
# pool is full, subsequent connect operations
# will be inserted in a queue with size equal
# to this option's value.
#
# If the number of queued connect operations
# reaches `pg_backlog`, exceeding connections will fail.
#
# If not specified, then number of open connections
# to the Postgres server is not limited.

#pg_ro_host = # Same as `pg_host`, but for the
# read-only connection.
# **Note:** Refer to the documentation
Expand Down Expand Up @@ -1185,6 +1214,18 @@
# Same as `pg_semaphore_timeout`, but for the
# read-only connection.

#pg_ro_keepalive_timeout = <pg_keepalive_timeout>
# Same as `pg_keepalive_timeout`, but for the
# read-only connection.

#pg_ro_pool_size = <pg_pool_size>
# Same as `pg_pool_size`, but for the
# read-only connection.

#pg_ro_backlog = <pg_backlog>
# Same as `pg_backlog`, but for the
# read-only connection.

#cassandra_contact_points = 127.0.0.1 # A comma-separated list of contact
# points to your cluster.
# You may specify IP addresses or
Expand Down
66 changes: 66 additions & 0 deletions kong/conf_loader/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ local CONF_INFERENCES = {
pg_ssl_verify = { typ = "boolean" },
pg_max_concurrent_queries = { typ = "number" },
pg_semaphore_timeout = { typ = "number" },
pg_keepalive_timeout = { typ = "number" },
pg_pool_size = { typ = "number" },
pg_backlog = { typ = "number" },

pg_ro_port = { typ = "number" },
pg_ro_timeout = { typ = "number" },
Expand All @@ -364,6 +367,9 @@ local CONF_INFERENCES = {
pg_ro_ssl_verify = { typ = "boolean" },
pg_ro_max_concurrent_queries = { typ = "number" },
pg_ro_semaphore_timeout = { typ = "number" },
pg_ro_keepalive_timeout = { typ = "number" },
pg_ro_pool_size = { typ = "number" },
pg_ro_backlog = { typ = "number" },

cassandra_contact_points = { typ = "array" },
cassandra_port = { typ = "number" },
Expand Down Expand Up @@ -964,6 +970,36 @@ local function check_and_infer(conf, opts)
errors[#errors + 1] = "pg_semaphore_timeout must be an integer greater than 0"
end

if conf.pg_keepalive_timeout then
if conf.pg_keepalive_timeout < 0 then
errors[#errors + 1] = "pg_keepalive_timeout must be greater than 0"
end

if conf.pg_keepalive_timeout ~= floor(conf.pg_keepalive_timeout) then
errors[#errors + 1] = "pg_keepalive_timeout must be an integer greater than 0"
end
end

if conf.pg_pool_size then
if conf.pg_pool_size < 0 then
errors[#errors + 1] = "pg_pool_size must be greater than 0"
end

if conf.pg_pool_size ~= floor(conf.pg_pool_size) then
errors[#errors + 1] = "pg_pool_size must be an integer greater than 0"
end
end

if conf.pg_backlog then
if conf.pg_backlog < 0 then
errors[#errors + 1] = "pg_backlog must be greater than 0"
end

if conf.pg_backlog ~= floor(conf.pg_backlog) then
errors[#errors + 1] = "pg_backlog must be an integer greater than 0"
end
end

if conf.pg_ro_max_concurrent_queries then
if conf.pg_ro_max_concurrent_queries < 0 then
errors[#errors + 1] = "pg_ro_max_concurrent_queries must be greater than 0"
Expand All @@ -984,6 +1020,36 @@ local function check_and_infer(conf, opts)
end
end

if conf.pg_ro_keepalive_timeout then
if conf.pg_ro_keepalive_timeout < 0 then
errors[#errors + 1] = "pg_ro_keepalive_timeout must be greater than 0"
end

if conf.pg_ro_keepalive_timeout ~= floor(conf.pg_ro_keepalive_timeout) then
errors[#errors + 1] = "pg_ro_keepalive_timeout must be an integer greater than 0"
end
end

if conf.pg_ro_pool_size then
if conf.pg_ro_pool_size < 0 then
errors[#errors + 1] = "pg_ro_pool_size must be greater than 0"
end

if conf.pg_ro_pool_size ~= floor(conf.pg_ro_pool_size) then
errors[#errors + 1] = "pg_ro_pool_size must be an integer greater than 0"
end
end

if conf.pg_ro_backlog then
if conf.pg_ro_backlog < 0 then
errors[#errors + 1] = "pg_ro_backlog must be greater than 0"
end

if conf.pg_ro_backlog ~= floor(conf.pg_ro_backlog) then
errors[#errors + 1] = "pg_ro_backlog must be an integer greater than 0"
end
end

if conf.worker_state_update_frequency <= 0 then
errors[#errors + 1] = "worker_state_update_frequency must be greater than 0"
end
Expand Down
30 changes: 25 additions & 5 deletions kong/db/strategies/postgres/connector.lua
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ local function reconnect(config)
"SET TIME ZONE ", connection:escape_literal("UTC"), ";",
})
if not ok then
setkeepalive(connection)
setkeepalive(connection, config.keepalive_timeout)
return nil, err
end
end
Expand All @@ -247,7 +247,7 @@ local function connect(config)
end


setkeepalive = function(connection)
setkeepalive = function(connection, keepalive_timeout)
if not connection or not connection.sock then
return true
end
Expand All @@ -259,7 +259,7 @@ setkeepalive = function(connection)
end

else
local _, err = connection:keepalive()
local _, err = connection:keepalive(keepalive_timeout)
if err then
return nil, err
end
Expand All @@ -284,6 +284,14 @@ function _mt:get_stored_connection(operation)
end
end

function _mt:get_keepalive_timeout(operation)
if self.config_ro and operation == 'read' then
return self.config_ro.keepalive_timeout
end

return self.config.keepalive_timeout
end


function _mt:init()
local res, err = self:query("SHOW server_version_num;")
Expand Down Expand Up @@ -433,7 +441,8 @@ function _mt:setkeepalive()
for operation in pairs(OPERATIONS) do
local conn = self:get_stored_connection(operation)
if conn then
local _, err = setkeepalive(conn)
local keepalive_timeout = self:get_keepalive_timeout(operation)
local _, err = setkeepalive(conn, keepalive_timeout)

self:store_connection(nil, operation)

Expand Down Expand Up @@ -526,7 +535,8 @@ function _mt:query(sql, operation)

res, err, partial, num_queries = connection:query(sql)

setkeepalive(connection)
local keepalive_timeout = self:get_keepalive_timeout(operation)
setkeepalive(connection, keepalive_timeout)
end

self:release_query_semaphore_resource(operation)
Expand Down Expand Up @@ -922,6 +932,11 @@ function _M.new(kong_config)
cafile = kong_config.lua_ssl_trusted_certificate_combined,
sem_max = kong_config.pg_max_concurrent_queries or 0,
sem_timeout = (kong_config.pg_semaphore_timeout or 60000) / 1000,
pool_size = kong_config.pg_pool_size,
backlog = kong_config.pg_backlog,

--- not used directly by pgmoon, but used internally in connector to set the keepalive timeout
keepalive_timeout = kong_config.pg_keepalive_timeout,
}

local refs = kong_config["$refs"]
Expand Down Expand Up @@ -972,6 +987,11 @@ function _M.new(kong_config)
sem_max = kong_config.pg_ro_max_concurrent_queries,
sem_timeout = kong_config.pg_ro_semaphore_timeout and
(kong_config.pg_ro_semaphore_timeout / 1000) or nil,
pool_size = kong_config.pg_ro_pool_size,
backlog = kong_config.pg_ro_backlog,

--- not used directly by pgmoon, but used internally in connector to set the keepalive timeout
keepalive_timeout = kong_config.pg_ro_keepalive_timeout,
}

if refs then
Expand Down
6 changes: 6 additions & 0 deletions kong/templates/kong_defaults.lua
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ pg_ssl = off
pg_ssl_verify = off
pg_max_concurrent_queries = 0
pg_semaphore_timeout = 60000
pg_keepalive_timeout = NONE
pg_pool_size = NONE
pg_backlog = NONE
pg_ro_host = NONE
pg_ro_port = NONE
Expand All @@ -116,6 +119,9 @@ pg_ro_ssl = NONE
pg_ro_ssl_verify = NONE
pg_ro_max_concurrent_queries = NONE
pg_ro_semaphore_timeout = NONE
pg_ro_keepalive_timeout = NONE
pg_ro_pool_size = NONE
pg_ro_backlog = NONE
cassandra_contact_points = 127.0.0.1
cassandra_port = 9042
Expand Down
100 changes: 100 additions & 0 deletions spec/01-unit/03-conf_loader_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1463,6 +1463,106 @@ describe("Configuration loader", function()
end)
end)

describe("pg connection pool options", function()
it("rejects a pg_keepalive_timeout with a negative number", function()
local conf, err = conf_loader(nil, {
pg_keepalive_timeout = -1,
})
assert.is_nil(conf)
assert.equal("pg_keepalive_timeout must be greater than 0", err)
end)

it("rejects a pg_keepalive_timeout with a decimal", function()
local conf, err = conf_loader(nil, {
pg_keepalive_timeout = 0.1,
})
assert.is_nil(conf)
assert.equal("pg_keepalive_timeout must be an integer greater than 0", err)
end)

it("rejects a pg_pool_size with a negative number", function()
local conf, err = conf_loader(nil, {
pg_pool_size = -1,
})
assert.is_nil(conf)
assert.equal("pg_pool_size must be greater than 0", err)
end)

it("rejects a pg_pool_size with a decimal", function()
local conf, err = conf_loader(nil, {
pg_pool_size = 0.1,
})
assert.is_nil(conf)
assert.equal("pg_pool_size must be an integer greater than 0", err)
end)

it("rejects a pg_backlog with a negative number", function()
local conf, err = conf_loader(nil, {
pg_backlog = -1,
})
assert.is_nil(conf)
assert.equal("pg_backlog must be greater than 0", err)
end)

it("rejects a pg_backlog with a decimal", function()
local conf, err = conf_loader(nil, {
pg_backlog = 0.1,
})
assert.is_nil(conf)
assert.equal("pg_backlog must be an integer greater than 0", err)
end)
end)

describe("pg read-only connection pool options", function()
it("rejects a pg_ro_keepalive_timeout with a negative number", function()
local conf, err = conf_loader(nil, {
pg_ro_keepalive_timeout = -1,
})
assert.is_nil(conf)
assert.equal("pg_ro_keepalive_timeout must be greater than 0", err)
end)

it("rejects a pg_ro_keepalive_timeout with a decimal", function()
local conf, err = conf_loader(nil, {
pg_ro_keepalive_timeout = 0.1,
})
assert.is_nil(conf)
assert.equal("pg_ro_keepalive_timeout must be an integer greater than 0", err)
end)

it("rejects a pg_ro_pool_size with a negative number", function()
local conf, err = conf_loader(nil, {
pg_ro_pool_size = -1,
})
assert.is_nil(conf)
assert.equal("pg_ro_pool_size must be greater than 0", err)
end)

it("rejects a pg_ro_pool_size with a decimal", function()
local conf, err = conf_loader(nil, {
pg_ro_pool_size = 0.1,
})
assert.is_nil(conf)
assert.equal("pg_ro_pool_size must be an integer greater than 0", err)
end)

it("rejects a pg_ro_backlog with a negative number", function()
local conf, err = conf_loader(nil, {
pg_ro_backlog = -1,
})
assert.is_nil(conf)
assert.equal("pg_ro_backlog must be greater than 0", err)
end)

it("rejects a pg_ro_backlog with a decimal", function()
local conf, err = conf_loader(nil, {
pg_ro_backlog = 0.1,
})
assert.is_nil(conf)
assert.equal("pg_ro_backlog must be an integer greater than 0", err)
end)
end)

describe("worker_state_update_frequency option", function()
it("is rejected with a zero", function()
local conf, err = conf_loader(nil, {
Expand Down
Loading

1 comment on commit f132418

@khcp-gha-bot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:f1324182cad8f2f702df679ec0c0b3cd13f1bd16

Please sign in to comment.