Expand Up
@@ -30,6 +30,10 @@
% %% some conflicting low-level commands (such as `parse', `bind', `execute') are
% %% executed in a wrong order. In this case server and epgsql states become out of
% %% sync and {@link epgsql_cmd_sync} have to be executed in order to recover.
% %%
% %% {@link epgsql_cmd_copy_from_stdin} and {@link epgsql_cmd_start_replication} switches the
% %% "state machine" of connection process to a special "COPY mode" subprotocol.
% %% See [https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-COPY].
% %% @see epgsql_cmd_connect. epgsql_cmd_connect for network connection and authentication setup
% %% @end
% %% Copyright (C) 2009 - Will Glozer. All rights reserved.
Expand All
@@ -46,40 +50,44 @@
get_parameter /2 ,
set_notice_receiver /2 ,
get_cmd_status /1 ,
cancel /1 ]).
cancel /1 ,
copy_send_rows /3 ,
standby_status_update /3 ]).
-export ([handle_call /3 , handle_cast /2 , handle_info /2 ]).
-export ([init /1 , code_change /3 , terminate /2 ]).
% % loop callback
-export ([on_message /3 , on_replication /3 ]).
-export ([on_message /3 , on_replication /3 , on_copy_from_stdin / 3 ]).
% % Comand's APIs
-export ([set_net_socket /3 , init_replication_state /1 , set_attr /3 , get_codec /1 ,
get_rows /1 , get_results /1 , notify /2 , send /2 , send /3 , send_multi /2 ,
get_parameter_internal /2 ,
get_replication_state /1 , set_packet_handler /2 ]).
get_subproto_state /1 , set_packet_handler /2 ]).
-export_type ([transport / 0 , pg_sock / 0 , error / 0 ]).
-include (" epgsql.hrl" ).
-include (" protocol.hrl" ).
-include (" epgsql_replication.hrl" ).
-include (" epgsql_copy.hrl" ).
-type transport () :: {call , any ()}
| {cast , pid (), reference ()}
| {incremental , pid (), reference ()}.
-type tcp_socket () :: port (). % gen_tcp:socket() isn't exported prior to erl 18
-type repl_state () :: # repl {}.
-type copy_state () :: # copy {}.
-type error () :: {error , sync_required | closed | sock_closed | sock_error }.
-record (state , {mod :: gen_tcp | ssl | undefined ,
sock :: tcp_socket () | ssl :sslsocket () | undefined ,
data = <<>>,
backend :: {Pid :: integer (), Key :: integer ()} | undefined ,
handler = on_message :: on_message | on_replication | undefined ,
handler = on_message :: on_message | on_replication | on_copy_from_stdin | undefined ,
codec :: epgsql_binary :codec () | undefined ,
queue = queue :new () :: queue :queue ({epgsql_command :command (), any (), transport ()}),
current_cmd :: epgsql_command :command () | undefined ,
Expand All
@@ -92,11 +100,17 @@
sync_required :: boolean () | undefined ,
txstatus :: byte () | undefined , % $I | $T | $E,
complete_status :: atom () | {atom (), integer ()} | undefined ,
repl :: repl_state () | undefined ,
subproto_state :: repl_state () | copy_state () | undefined ,
connect_opts :: epgsql :connect_opts () | undefined }).
-opaque pg_sock () :: # state {}.
-ifndef (OTP_RELEASE ). % pre-OTP21
-define (WITH_STACKTRACE (T , R , S ), T :R -> S = erlang :get_stacktrace (), ).
-else .
-define (WITH_STACKTRACE (T , R , S ), T :R :S -> ).
-endif .
% % -- client interface --
start_link () ->
Expand Down
Expand Up
@@ -131,6 +145,12 @@ get_cmd_status(C) ->
cancel (S ) ->
gen_server :cast (S , cancel ).
copy_send_rows (C , Rows , Timeout ) ->
gen_server :call (C , {copy_send_rows , Rows }, Timeout ).
standby_status_update (C , FlushedLSN , AppliedLSN ) ->
gen_server :call (C , {standby_status_update , FlushedLSN , AppliedLSN }).
% % -- command APIs --
Expand All
@@ -145,7 +165,7 @@ set_net_socket(Mod, Socket, State) ->
-spec init_replication_state (pg_sock ()) -> pg_sock ().
init_replication_state (State ) ->
State # state {repl = # repl {}}.
State # state {subproto_state = # repl {}}.
-spec set_attr (atom (), any (), pg_sock ()) -> pg_sock ().
set_attr (backend , {_Pid , _Key } = Backend , State ) ->
Expand All
@@ -158,8 +178,8 @@ set_attr(codec, Codec, State) ->
State # state {codec = Codec };
set_attr (sync_required , Value , State ) ->
State # state {sync_required = Value };
set_attr (replication_state , Value , State ) ->
State # state {repl = Value };
set_attr (subproto_state , Value , State ) ->
State # state {subproto_state = Value };
set_attr (connect_opts , ConnectOpts , State ) ->
State # state {connect_opts = ConnectOpts }.
Expand All
@@ -172,9 +192,9 @@ set_packet_handler(Handler, State) ->
get_codec (# state {codec = Codec }) ->
Codec .
-spec get_replication_state (pg_sock ()) -> repl_state ().
get_replication_state (# state {repl = Repl }) ->
Repl .
-spec get_subproto_state (pg_sock ()) -> repl_state () | copy_state () | undefined .
get_subproto_state (# state {subproto_state = SubState }) ->
SubState .
-spec get_rows (pg_sock ()) -> [tuple ()].
get_rows (# state {rows = Rows }) ->
Expand All
@@ -197,6 +217,10 @@ get_parameter_internal(Name, #state{parameters = Parameters}) ->
init ([]) ->
{ok , # state {}}.
handle_call ({command , Command , Args }, From , State ) ->
Transport = {call , From },
command_new (Transport , Command , Args , State );
handle_call ({get_parameter , Name }, _From , State ) ->
{reply , {ok , get_parameter_internal (Name , State )}, State };
Expand All
@@ -208,14 +232,16 @@ handle_call(get_cmd_status, _From, #state{complete_status = Status} = State) ->
handle_call ({standby_status_update , FlushedLSN , AppliedLSN }, _From ,
# state {handler = on_replication ,
repl = # repl {last_received_lsn = ReceivedLSN } = Repl } = State ) ->
subproto_state = # repl {last_received_lsn = ReceivedLSN } = Repl } = State ) ->
send (State , ? COPY_DATA , epgsql_wire :encode_standby_status_update (ReceivedLSN , FlushedLSN , AppliedLSN )),
Repl1 = Repl # repl {last_flushed_lsn = FlushedLSN ,
last_applied_lsn = AppliedLSN },
{reply , ok , State # state {repl = Repl1 }};
handle_call ({command , Command , Args }, From , State ) ->
Transport = {call , From },
command_new (Transport , Command , Args , State ).
{reply , ok , State # state {subproto_state = Repl1 }};
handle_call ({copy_send_rows , Rows }, _From ,
# state {handler = Handler , subproto_state = CopyState } = State ) ->
Response = handle_copy_send_rows (Rows , Handler , CopyState , State ),
{reply , Response , State }.
handle_cast ({{Method , From , Ref } = Transport , Command , Args }, State )
when ((Method == cast ) or (Method == incremental )),
Expand All
@@ -241,6 +267,10 @@ handle_cast(cancel, State = #state{backend = {Pid, Key},
end ,
{noreply , State }.
handle_info ({DataTag , Sock , Data2 }, # state {data = Data , sock = Sock } = State )
when DataTag == tcp ; DataTag == ssl ->
loop (State # state {data = <<Data /binary , Data2 /binary >>});
handle_info ({Closed , Sock }, # state {sock = Sock } = State )
when Closed == tcp_closed ; Closed == ssl_closed ->
{stop , sock_closed , flush_queue (State # state {sock = undefined }, {error , sock_closed })};
Expand All
@@ -256,8 +286,10 @@ handle_info({inet_reply, _, ok}, State) ->
handle_info ({inet_reply , _ , Status }, State ) ->
{stop , Status , flush_queue (State , {error , Status })};
handle_info ({_ , Sock , Data2 }, # state {data = Data , sock = Sock } = State ) ->
loop (State # state {data = <<Data /binary , Data2 /binary >>}).
handle_info ({io_request , From , ReplyAs , Request }, State ) ->
Response = handle_io_request (Request , State ),
io_reply (Response , From , ReplyAs ),
{noreply , State }.
terminate (_Reason , # state {sock = undefined }) -> ok ;
terminate (_Reason , # state {mod = gen_tcp , sock = Sock }) -> gen_tcp :close (Sock );
Expand Down
Expand Up
@@ -398,7 +430,7 @@ do_send(gen_tcp, Sock, Bin) ->
do_send (ssl , Sock , Bin ) ->
ssl :send (Sock , Bin ).
loop (# state {data = Data , handler = Handler , repl = Repl } = State ) ->
loop (# state {data = Data , handler = Handler , subproto_state = Repl } = State ) ->
case epgsql_wire :decode_message (Data ) of
{Type , Payload , Tail } ->
case ? MODULE :Handler (Type , Payload , State # state {data = Tail }) of
Expand All
@@ -409,14 +441,16 @@ loop(#state{data = Data, handler = Handler, repl = Repl} = State) ->
end ;
_ ->
% % in replication mode send feedback after each batch of messages
case (Repl =/= undefined ) andalso (Repl # repl .feedback_required ) of
case Handler == on_replication
andalso (Repl =/= undefined )
andalso (Repl # repl .feedback_required ) of
true ->
# repl {last_received_lsn = LastReceivedLSN ,
last_flushed_lsn = LastFlushedLSN ,
last_applied_lsn = LastAppliedLSN } = Repl ,
send (State , ? COPY_DATA , epgsql_wire :encode_standby_status_update (
LastReceivedLSN , LastFlushedLSN , LastAppliedLSN )),
{noreply , State # state {repl = Repl # repl {feedback_required = false }}};
{noreply , State # state {subproto_state = Repl # repl {feedback_required = false }}};
_ ->
{noreply , State }
end
Expand Down
Expand Up
@@ -486,6 +520,74 @@ flush_queue(#state{current_cmd = undefined} = State, _) ->
flush_queue (State , Error ) ->
flush_queue (finish (State , Error ), Error ).
% % @doc Handler for IO protocol version of COPY FROM STDIN
% %
% % COPY FROM STDIN is implemented as Erlang
% % <a href="https://erlang.org/doc/apps/stdlib/io_protocol.html">io protocol</a>.
handle_io_request (_ , # state {handler = Handler }) when Handler =/= on_copy_from_stdin ->
% % Received IO request when `epgsql_cmd_copy_from_stdin' haven't yet been called or it was
% % terminated with error and already sent `ReadyForQuery'
{error , not_in_copy_mode };
handle_io_request (_ , # state {subproto_state = # copy {last_error = Err }}) when Err =/= undefined ->
{error , Err };
handle_io_request ({put_chars , Encoding , Chars }, State ) ->
send (State , ? COPY_DATA , encode_chars (Encoding , Chars ));
handle_io_request ({put_chars , Encoding , Mod , Fun , Args }, State ) ->
try apply (Mod , Fun , Args ) of
Chars when is_binary (Chars );
is_list (Chars ) ->
handle_io_request ({put_chars , Encoding , Chars }, State );
Other ->
{error , {fun_return_not_characters , Other }}
catch ? WITH_STACKTRACE (T , R , S )
{error , {fun_exception , {T , R , S }}}
end ;
handle_io_request ({setopts , _ }, _State ) ->
{error , request };
handle_io_request (getopts , _State ) ->
{error , request };
handle_io_request ({requests , Requests }, State ) ->
try_requests (Requests , State , ok ).
try_requests ([Req | Requests ], State , _ ) ->
case handle_io_request (Req , State ) of
{error , _ } = Err ->
Err ;
Other ->
try_requests (Requests , State , Other )
end ;
try_requests ([], _ , LastRes ) ->
LastRes .
io_reply (Result , From , ReplyAs ) ->
From ! {io_reply , ReplyAs , Result }.
% % @doc Handler for `copy_send_rows' API
% %
% % Only supports binary protocol right now.
% % But, in theory, can be used for text / csv formats as well, but we would need to add
% % some more callbacks to `epgsql_type' behaviour (eg, `encode_text')
handle_copy_send_rows (_Rows , Handler , _CopyState , _State ) when Handler =/= on_copy_from_stdin ->
{error , not_in_copy_mode };
handle_copy_send_rows (_ , _ , # copy {format = Format }, _ ) when Format =/= binary ->
% % copy_send_rows only supports "binary" format
{error , not_binary_format };
handle_copy_send_rows (_ , _ , # copy {last_error = LastError }, _ ) when LastError =/= undefined ->
% % server already reported error in data stream asynchronously
{error , LastError };
handle_copy_send_rows (Rows , _ , # copy {binary_types = Types }, State ) ->
Data = [epgsql_wire :encode_copy_row (Values , Types , get_codec (State ))
|| Values <- Rows ],
ok = send (State , ? COPY_DATA , Data ).
encode_chars (_ , Bin ) when is_binary (Bin ) ->
Bin ;
encode_chars (unicode , Chars ) when is_list (Chars ) ->
unicode :characters_to_binary (Chars );
encode_chars (latin1 , Chars ) when is_list (Chars ) ->
unicode :characters_to_binary (Chars , latin1 ).
to_binary (B ) when is_binary (B ) -> B ;
to_binary (L ) when is_list (L ) -> list_to_binary (L ).
Expand Down
Expand Up
@@ -547,12 +649,31 @@ on_message(?NOTIFICATION, <<Pid:?int32, Strings/binary>>, State) ->
on_message (Msg , Payload , State ) ->
command_handle_message (Msg , Payload , State ).
% % @doc Handle "copy subprotocol" for COPY .. FROM STDIN
% %
% % Activated by `epgsql_cmd_copy_from_stdin', deactivated by `epgsql_cmd_copy_done' or error
on_copy_from_stdin (? READY_FOR_QUERY , <<Status :8 >>,
# state {subproto_state = # copy {last_error = Err ,
initiator = Pid }} = State ) when Err =/= undefined ->
% % Reporting error from here and not from ?ERROR so it's easier to be in sync state
Pid ! {epgsql , self (), {error , Err }},
{noreply , State # state {subproto_state = undefined ,
handler = on_message ,
txstatus = Status }};
on_copy_from_stdin (? ERROR , Err , # state {subproto_state = SubState } = State ) ->
Reason = epgsql_wire :decode_error (Err ),
{noreply , State # state {subproto_state = SubState # copy {last_error = Reason }}};
on_copy_from_stdin (M , Data , Sock ) when M == ? NOTICE ;
M == ? NOTIFICATION ;
M == ? PARAMETER_STATUS ->
on_message (M , Data , Sock ).
% % CopyData for Replication mode
on_replication (? COPY_DATA , <<? PRIMARY_KEEPALIVE_MESSAGE :8 , LSN :? int64 , _Timestamp :? int64 , ReplyRequired :8 >>,
# state {repl = # repl {last_flushed_lsn = LastFlushedLSN ,
last_applied_lsn = LastAppliedLSN ,
align_lsn = AlignLsn } = Repl } = State ) ->
# state {subproto_state = # repl {last_flushed_lsn = LastFlushedLSN ,
last_applied_lsn = LastAppliedLSN ,
align_lsn = AlignLsn } = Repl } = State ) ->
Repl1 =
case ReplyRequired of
1 when AlignLsn ->
Expand All
@@ -569,14 +690,14 @@ on_replication(?COPY_DATA, <<?PRIMARY_KEEPALIVE_MESSAGE:8, LSN:?int64, _Timestam
Repl # repl {feedback_required = true ,
last_received_lsn = LSN }
end ,
{noreply , State # state {repl = Repl1 }};
{noreply , State # state {subproto_state = Repl1 }};
% % CopyData for Replication mode
on_replication (? COPY_DATA , <<? X_LOG_DATA , StartLSN :? int64 , EndLSN :? int64 ,
_Timestamp :? int64 , WALRecord /binary >>,
# state {repl = Repl } = State ) ->
# state {subproto_state = Repl } = State ) ->
Repl1 = handle_xlog_data (StartLSN , EndLSN , WALRecord , Repl ),
{noreply , State # state {repl = Repl1 }};
{noreply , State # state {subproto_state = Repl1 }};
on_replication (? ERROR , Err , State ) ->
Reason = epgsql_wire :decode_error (Err ),
{stop , {error , Reason }, State };
Expand Down