Skip to content

Commit

Permalink
Merge pull request #278 from seriyps/generalize-socket-active-n
Browse files Browse the repository at this point in the history
Generalize socket active n
  • Loading branch information
seriyps committed Dec 28, 2022
2 parents 9283564 + 958eb97 commit a748d2b
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 29 deletions.
59 changes: 52 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ connect(Opts) -> {ok, Connection :: epgsql:connection()} | {error, Reason :: epg
port => inet:port_number(),
ssl => boolean() | required,
ssl_opts => [ssl:tls_client_option()], % @see OTP ssl documentation
socket_active => true | integer(), % @see "Active socket" section below
tcp_opts => [gen_tcp:option()], % @see OTP gen_tcp module documentation
timeout => timeout(), % socket connect timeout, default: 5000 ms
async => pid() | atom(), % process to receive LISTEN/NOTIFY msgs
Expand Down Expand Up @@ -125,10 +126,12 @@ Only `host` and `username` are mandatory, but most likely you would need `databa
- `application_name` is an optional string parameter. It is usually set by an application upon
connection to the server. The name will be displayed in the `pg_stat_activity`
view and included in CSV log entries.
- `socket_active` is an optional parameter, which can be true or integer in the range -32768
to 32767 (inclusive). This option only works in the replication mode and is used to control
the flow of incoming messages. See [Streaming replication protocol](#streaming-replication-protocol)
for more details.
- `socket_active` is an optional parameter, which can be `true` or an integer in the range -32768
to 32767 (inclusive, however only positive value make sense right now).
This option is used to control the flow of incoming messages from the network socket to make
sure huge query results won't result in `epgsql` process mailbox overflow. It affects the
behaviour of some of the commands and interfaces (`epgsqli` and replication), so, use with
caution! See [Active socket](#active-socket) for more details.

Options may be passed as proplist or as map with the same key names.

Expand Down Expand Up @@ -677,6 +680,48 @@ Retrieve actual value of server-side parameters, such as character endoding,
date/time format and timezone, server version and so on. See [libpq PQparameterStatus](https://www.postgresql.org/docs/current/static/libpq-status.html#LIBPQ-PQPARAMETERSTATUS).
Parameter's value may change during connection's lifetime.
## Active socket
By default `epgsql` sets its underlying `gen_tcp` or `ssl` socket into `{active, true}` mode
(make sure you understand the [OTP inet:setopts/2 documentation](https://www.erlang.org/doc/man/inet.html#setopts-2)
about `active` option).
That means if PostgreSQL decides to quickly send a huge amount of data to the client (for example,
client made a SELECT that returns large amount of results or when we are connected in streaming
replication mode and receiving a lot of updates), underlying network socket might quickly send
large number of messages to the `epgsql` connection process leading to the growing mailbox and high
RAM consumption (or even OOM situation in case of really large query result or massive replication
update).
To avoid such scenarios, `epgsql` can may rely on "TCP backpressure" to prevent socket from sending
unlimited number of messages - implement a "flow control". To do so, `socket_active => 1..32767`
could be added at connection time. This option would set `{active, N}` option on the underlying
socket and would tell the network to send no more than `N` messages to `epgsql` connection and then
pause to let `epgsql` and the client process the already received network data and then decide how
to proceed.
The way this pause is signalled to the client and how the socket can be activated again depends on
the interface client is using:
- when `epgsqli` interface is used, `epgsql` would send all the normal low level messages and then
at any point it may send `{epgsql, C, socket_passive}` message to signal that socket have been
paused. `epgsql:activate(C)` must be called to re-activate the socket.
- when `epgsql` is connected in [Streaming replication](doc/streaming.md) mode and `pid()` is used
as the receiver of the X-Log Data messages, it would behave in the same way:
`{epgsql, C, socket_passive}` might be sent along with
`{epgsql, self(), {x_log_data, _, _, _}}` messages and `epgsql:activate/1` can be used to
re-activate.
- in all the other cases (`epgsql` / `epgsqla` command, while `COPY FROM STDIN` mode is active,
when Streaming replication with Erlang module callback as receiver of X-Log Data or
while connection is idle) `epgsql` would transparently re-activate the socket automatically: it
won't prevent high RAM usage from large SELECT result, but it would make sure `epgsql` process
has no more than `N` messages from the network in its mailbox.

It is a good idea to combine `socket_active => N` with some specific value of
`tcp_opts => [{buffer, X}]` since each of the `N` messages sent from the network to `epgsql`
process would contain no more than `X` bytes. So the MAXIMUM amount of data seating at the `epgsql`
mailbox could be roughly estimated as `N * X`. So if `N = 256` and `X = 512*1024` (512kb) then
there will be no more than `N * X = 256 * 524288 = 134_217_728` or 128MB of data in the mailbox
at the same time.

## Streaming replication protocol

Expand All @@ -702,7 +747,7 @@ Here's how to create a patch that's easy to integrate:
- Create a new branch for the proposed fix.
- Make sure it includes a test and documentation, if appropriate.
- Open a pull request against the `devel` branch of epgsql.
- Passing build in travis
- Passing CI build

## Test Setup

Expand All @@ -712,11 +757,11 @@ Postgres database.
NOTE: you will need the postgis and hstore extensions to run these
tests! On Ubuntu, you can install them with a command like this:

1. `apt-get install postgresql-9.3-postgis-2.1 postgresql-contrib`
1. `apt-get install postgresql-12-postgis-3 postgresql-contrib`
1. `make test` # Runs the tests

NOTE 2: It's possible to run tests on exact postgres version by changing $PATH like
`PATH=$PATH:/usr/lib/postgresql/9.5/bin/ make test`
`PATH=$PATH:/usr/lib/postgresql/12/bin/ make test`
[![CI](https://github.com/epgsql/epgsql/actions/workflows/ci.yml/badge.svg)](https://github.com/epgsql/epgsql/actions/workflows/ci.yml)
6 changes: 4 additions & 2 deletions doc/streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ Opts = #{host => "localhost",
{ok, Conn} = epgsql:connect(Opts).
```

It is currently allowed only in the replication mode. Its main purpose is to control the flow of
replication messages from Postgresql database. If a database is under a high load and a process, which
Its main purpose is to control the flow of replication messages from Postgresql database.
If a database is under a high load and a process, which
handles the message stream, cannot keep up with it then setting this option gives the handling process
ability to get messages on-demand.

Expand All @@ -175,3 +175,5 @@ the connection's options.

In the case of synchronous handler for replication messages `epgsql` will handle `socket_passive`
messages internally.

See [Active socket README section](../#active-socket) for more details.
2 changes: 1 addition & 1 deletion src/commands/epgsql_cmd_connect.erl
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ execute(PgSock, #connect{opts = #{username := Username} = Opts, stage = connect}
execute(PgSock, #connect{stage = auth, auth_send = {PacketType, Data}} = St) ->
{send, PacketType, Data, PgSock, St#connect{auth_send = undefined}}.

-spec open_socket([{atom(), any()}], epgsql:connect_opts()) ->
-spec open_socket([{atom(), any()}], epgsql:connect_opts_map()) ->
{ok , gen_tcp | ssl, gen_tcp:socket() | ssl:sslsocket()} | {error, any()}.
open_socket(SockOpts, #{host := Host} = ConnectOpts) ->
Timeout = maps:get(timeout, ConnectOpts, 5000),
Expand Down
10 changes: 5 additions & 5 deletions src/epgsql.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
%% private
-export([handle_x_log_data/5]).

-export_type([connection/0, connect_option/0, connect_opts/0,
-export_type([connection/0, connect_option/0, connect_opts/0, connect_opts_map/0,
connect_error/0, query_error/0, sql_query/0, column/0,
type_name/0, epgsql_type/0, statement/0,
transaction_option/0, transaction_opts/0, socket_active/0]).
Expand Down Expand Up @@ -81,10 +81,8 @@
{replication, Replication :: string()} | % Pass "database" to connect in replication mode
{application_name, ApplicationName :: string()} |
{socket_active, Active :: socket_active()}.

-type connect_opts() ::
[connect_option()]
| #{host => host(),
-type connect_opts_map() ::
#{host => host(),
username => string(),
password => password(),
database => string(),
Expand All @@ -101,6 +99,8 @@
socket_active => socket_active()
}.

-type connect_opts() :: connect_opts_map() | [connect_option()].

-type transaction_option() ::
{reraise, boolean()} |
{ensure_committed, boolean()} |
Expand Down
38 changes: 27 additions & 11 deletions src/epgsql_sock.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
| {cast, pid(), reference()}
| {incremental, pid(), reference()}.

-type tcp_socket() :: port(). %gen_tcp:socket() isn't exported prior to erl 18
-type tcp_socket() :: gen_tcp:socket().
-type repl_state() :: #repl{}.
-type copy_state() :: #copy{}.

Expand All @@ -102,7 +102,7 @@
txstatus :: byte() | undefined, % $I | $T | $E,
complete_status :: atom() | {atom(), integer()} | undefined,
subproto_state :: repl_state() | copy_state() | undefined,
connect_opts :: epgsql:connect_opts() | undefined}).
connect_opts :: epgsql:connect_opts_map() | undefined}).

-opaque pg_sock() :: #state{}.

Expand Down Expand Up @@ -195,9 +195,7 @@ set_attr(connect_opts, ConnectOpts, State) ->
%% XXX: be careful!
-spec set_packet_handler(atom(), pg_sock()) -> pg_sock().
set_packet_handler(Handler, State0) ->
State = State0#state{handler = Handler},
ok = activate_socket(State),
State.
State0#state{handler = Handler}.

-spec get_codec(pg_sock()) -> epgsql_binary:codec().
get_codec(#state{codec = Codec}) ->
Expand Down Expand Up @@ -290,7 +288,7 @@ handle_info({DataTag, Sock, Data2}, #state{data = Data, sock = Sock} = State)

handle_info({Passive, Sock}, #state{sock = Sock} = State)
when Passive == ssl_passive; Passive == tcp_passive ->
NewState = send_socket_pasive(State),
NewState = handle_socket_pasive(State),
{noreply, NewState};

handle_info({Closed, Sock}, #state{sock = Sock} = State)
Expand Down Expand Up @@ -328,11 +326,24 @@ format_status(terminate, [_PDict, State]) ->
State#state{rows = information_redacted}.

%% -- internal functions --
-spec send_socket_pasive(pg_sock()) -> pg_sock().
send_socket_pasive(#state{subproto_state = #repl{receiver = Rec}} = State) when Rec =/= undefined ->
-spec handle_socket_pasive(pg_sock()) -> pg_sock().
handle_socket_pasive(#state{handler = on_replication,
subproto_state = #repl{receiver = Rec}} = State) when is_pid(Rec) ->
%% Replication with pid() as X-Log data receiver
Rec ! {epgsql, self(), socket_passive},
State;
send_socket_pasive(State) ->
handle_socket_pasive(#state{current_cmd_transport = {incremental, From, _}} = State) ->
%% `epgsqli' interface command
From ! {epgsql, self(), socket_passive},
State;
handle_socket_pasive(State) ->
%% - current_cmd_transport is `call' or `cast': client expects whole result set anyway
%% - handler = on_copy_from_stdin: we don't expect much data from the server
%% - handler = on_replication with callback module as X-Log data receiver: pace controlled by
%% callback execution time
%% - idle (eg, receiving asynchronous error or NOTIFICATION/WARNING): client might not expect
%% to receive the `socket_passive' messages or there might be no client at all. Also, async
%% notifications are usually small.
ok = activate_socket(State),
State.

Expand Down Expand Up @@ -436,7 +447,7 @@ setopts(#state{mod = Mod, sock = Sock}, Opts) ->
end.

-spec get_socket_active(pg_sock()) -> epgsql:socket_active().
get_socket_active(#state{handler = on_replication, connect_opts = #{socket_active := Active}}) ->
get_socket_active(#state{connect_opts = #{socket_active := Active}}) ->
Active;
get_socket_active(_State) ->
true.
Expand Down Expand Up @@ -465,7 +476,12 @@ send_multi(#state{mod = Mod, sock = Sock}, List) ->
do_send(gen_tcp, Sock, Bin) ->
%% Why not gen_tcp:send/2?
%% See https://github.com/rabbitmq/rabbitmq-common/blob/v3.7.4/src/rabbit_writer.erl#L367-L384
%% Because of that we also have `handle_info({inet_reply, ...`
%% Since `epgsql' uses `{active, true}' socket option by-default, it may potentially quickly
%% receive huge amount of data from the network.
%% With introduction of `{socket_active, N}' option it becomes less of a problem, but
%% `{active, true}' is still the default.
%%
%% Because we use `inet' driver directly, we also have `handle_info({inet_reply, ...`
try erlang:port_command(Sock, Bin) of
true ->
ok
Expand Down
67 changes: 66 additions & 1 deletion test/epgsql_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ groups() ->
pipelined_prepared_query,
pipelined_parse_batch_execute
]},
{incremental_sock_active, [parallel], [
incremental_sock_active_n,
incremental_sock_active_n_ssl
]},
{generic, [parallel], [
with_transaction,
mixed_api
Expand Down Expand Up @@ -145,7 +149,7 @@ groups() ->
SubGroups ++
[{epgsql, [], [{group, generic} | Tests]},
{epgsql_cast, [], [{group, pipelining} | Tests]},
{epgsql_incremental, [], Tests}].
{epgsql_incremental, [], [{group, incremental_sock_active} | Tests]}].

end_per_suite(_Config) ->
ok.
Expand Down Expand Up @@ -1609,6 +1613,67 @@ pipelined_parse_batch_execute(Config) ->
end || Ref <- CloseRefs],
erlang:cancel_timer(Timer)
end).

incremental_sock_active_n(Config) ->
epgsql_incremental = ?config(module, Config),
Q = "SELECT *, 'Hello world' FROM generate_series(0, 10240)",
epgsql_ct:with_connection(Config,
fun(C) ->
Ref = epgsqli:squery(C, Q),
{done, NumPassive, Others, Rows} = recv_incremental_active_n(C, Ref),
?assertMatch([{columns, _}, {complete, _}], Others),
?assert(NumPassive > 0),
?assertMatch([{<<"0">>, <<"Hello world">>},
{<<"1">>, <<"Hello world">>} | _], Rows),
?assertEqual(10241, length(Rows))
end,
"epgsql_test",
[{socket_active, 2}]).

-ifdef(OTP_RELEASE).
incremental_sock_active_n_ssl(Config) ->
epgsql_incremental = ?config(module, Config),
Q = "SELECT *, 'Hello world' FROM generate_series(0, 10240)",
epgsql_ct:with_connection(Config,
fun(C) ->
Ref = epgsqli:squery(C, Q),
{done, NumPassive, Others, Rows} = recv_incremental_active_n(C, Ref),
?assertMatch([{columns, _}, {complete, _}], Others),
?assert(NumPassive > 0),
?assertMatch([{<<"0">>, <<"Hello world">>},
{<<"1">>, <<"Hello world">>} | _], Rows),
?assertEqual(10241, length(Rows))
end,
"epgsql_test",
[{ssl, true}, {socket_active, 2}]).
-else.
%% {active, N} for SSL is only supported on OTP-21+
incremental_sock_active_n_ssl(_Config) ->
noop.
-endif.

recv_incremental_active_n(C, Ref) ->
recv_incremental_active_n(C, Ref, 0, [], []).

recv_incremental_active_n(C, Ref, NumPassive, Rows, Others) ->
receive
{C, Ref, {data, Row}} ->
recv_incremental_active_n(C, Ref, NumPassive, [Row | Rows], Others);
{epgsql, C, socket_passive} ->
ok = epgsql:activate(C),
recv_incremental_active_n(C, Ref, NumPassive + 1, Rows, Others);
{C, Ref, {error, _} = E} ->
E;
{C, Ref, done} ->
{done, NumPassive, lists:reverse(Others), lists:reverse(Rows)};
{C, Ref, Other} ->
recv_incremental_active_n(C, Ref, NumPassive, Rows, [Other | Others]);
Other ->
recv_incremental_active_n(C, Ref, NumPassive, Rows, [Other | Others])
after 5000 ->
error({timeout, NumPassive, Others, Rows})
end.

%% =============================================================================
%% Internal functions
%% ============================================================================
Expand Down
20 changes: 19 additions & 1 deletion test/epgsql_incremental.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ await_connect(Ref, Opts0) ->
{C, Ref, connected} ->
{ok, C};
{_C, Ref, Error = {error, _}} ->
Error
Error;
{epgsql, C, socket_passive} ->
ok = epgsql:activate(C),
await_connect(Ref, Opts0)
after Timeout ->
error(timeout)
end.
Expand Down Expand Up @@ -200,6 +203,9 @@ receive_result(C, Ref, Cols, Rows) ->
{ok, Cols, lists:reverse(Rows)};
{C, Ref, done} ->
done;
{epgsql, C, socket_passive} ->
ok = epgsql:activate(C),
receive_result(C, Ref, Cols, Rows);
{'EXIT', C, _Reason} ->
throw({error, closed})
end.
Expand Down Expand Up @@ -229,6 +235,9 @@ receive_extended_result(C, Ref, Rows) ->
{ok, lists:reverse(Rows)};
{C, Ref, done} ->
done;
{epgsql, C, socket_passive} ->
ok = epgsql:activate(C),
receive_extended_result(C, Ref, Rows);
{'EXIT', C, _Reason} ->
{error, closed}
end.
Expand All @@ -243,6 +252,9 @@ receive_describe(C, Ref, Statement = #statement{}) ->
{ok, Statement#statement{columns = []}};
{C, Ref, Error = {error, _}} ->
Error;
{epgsql, C, socket_passive} ->
ok = epgsql:activate(C),
receive_describe(C, Ref, Statement);
{'EXIT', C, _Reason} ->
{error, closed}
end.
Expand All @@ -255,6 +267,9 @@ receive_describe_portal(C, Ref) ->
{ok, []};
{C, Ref, Error = {error, _}} ->
Error;
{epgsql, C, socket_passive} ->
ok = epgsql:activate(C),
receive_describe_portal(C, Ref);
{'EXIT', C, _Reason} ->
{error, closed}
end.
Expand All @@ -265,6 +280,9 @@ receive_atom(C, Ref, Receive, Return) ->
Return;
{C, Ref, Error = {error, _}} ->
Error;
{epgsql, C, socket_passive} ->
ok = epgsql:activate(C),
receive_atom(C, Ref, Receive, Return);
{'EXIT', C, _Reason} ->
{error, closed}
end.
Expand Down

0 comments on commit a748d2b

Please sign in to comment.