Skip to content

Commit

Permalink
Merge pull request #14 from HernanRivasAcosta/master
Browse files Browse the repository at this point in the history
Parity
  • Loading branch information
HernanRivasAcosta committed Sep 16, 2014
2 parents 5beada3 + 7d99d74 commit 77dceb1
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 330 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ log
*.log
*.bak
rebar
.rebar
11 changes: 6 additions & 5 deletions rel/kafkerl.app.config
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
{kafkerl, [%{gen_server_name, kafkerl_client},
{disabled, false},
{conn_config, [{brokers, [{"localhost", 9092}]},
{client_id, kafkerl_client},
{flush_every, 60},
{client_id, kafkerl_client}, % Sent to kafka
{max_broker_retries, 2},
{broker_tcp_timeout, 1000},
{max_metadata_retries, -1},
{retry_on_topic_error, true},
{metadata_tcp_timeout, 1000}]},
{topics, [test1, test2, test3]},
{buffer, [{max_queue_size, 4}]}]}].
{metadata_tcp_timeout, 1000},
{max_queue_size, 20}, % In items, per topic/partition
{max_time_queued, 30} % In seconds
]},
{topics, [test1, test2, test3]}]}].
1 change: 0 additions & 1 deletion src/kafkerl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ subscribe(Name, Callback) ->
subscribe(Name, Callback, Filter) ->
kafkerl_connector:subscribe(Name, Callback, Filter).


-spec unsubscribe(callback()) -> ok.
unsubscribe(Callback) ->
unsubscribe(?MODULE, Callback).
Expand Down
143 changes: 75 additions & 68 deletions src/kafkerl_broker_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
-behaviour(gen_server).

%% API
-export([send/2, flush/1, kill/1]).
-export([add_buffer/2, clear_buffers/1]).
% Only for internal use
-export([connect/5]).
% Supervisors
Expand All @@ -20,6 +20,7 @@
-type start_link_response() :: {ok, atom(), pid()} | ignore | {error, any()}.

-record(state, {name = undefined :: atom(),
buffers = [] :: [atom()],
conn_idx = undefined :: conn_idx(),
client_id = undefined :: binary(),
socket = undefined :: undefined | port(),
Expand All @@ -30,7 +31,9 @@
max_retries = 0 :: integer(),
retry_interval = 0 :: integer(),
request_number = 0 :: integer(),
pending_requests = [] :: [integer()]}).
pending_requests = [] :: [integer()],
max_time_queued = 0 :: integer(),
ets = undefined :: atom()}).
-type state() :: #state{}.

%%==============================================================================
Expand All @@ -39,7 +42,8 @@
-spec start_link(conn_idx(), pid(), socket_address(), any()) ->
start_link_response().
start_link(Id, Connector, Address, Config) ->
Name = list_to_atom(atom_to_list(?MODULE) ++ "_" ++ integer_to_list(Id)),
NameStr = atom_to_list(?MODULE) ++ "_" ++ integer_to_list(Id),
Name = list_to_atom(NameStr),
Params = [Id, Connector, Address, Config, Name],
case gen_server:start_link({local, Name}, ?MODULE, Params, []) of
{ok, Pid} ->
Expand All @@ -48,35 +52,26 @@ start_link(Id, Connector, Address, Config) ->
Other
end.

-spec send(server_ref(), basic_message()) -> ok.
send(ServerRef, Message) ->
gen_server:call(ServerRef, {send, Message}).
-spec add_buffer(server_ref(), atom()) -> ok.
add_buffer(ServerRef, Buffer) ->
gen_server:call(ServerRef, {add_buffer, Buffer}).

-spec flush(server_ref()) -> ok.
flush(ServerRef) ->
gen_server:call(ServerRef, {flush}).

-spec kill(server_ref()) -> ok.
kill(ServerRef) ->
gen_server:call(ServerRef, {kill}).
-spec clear_buffers(server_ref()) -> ok.
clear_buffers(ServerRef) ->
gen_server:call(ServerRef, {clear_buffers}).

%%==============================================================================
%% gen_server callbacks
%%==============================================================================
-spec handle_call(any(), any(), state()) -> {reply, ok, state()}.
handle_call({send, Message}, _From, State) ->
handle_send(Message, State);
handle_call({flush}, _From, State) ->
handle_flush(State);
handle_call({kill}, _From, State = #state{name = Name}) ->
% TODO: handle the potentially buffered messages
lager:info("~p stopped by it's parent connector", [Name]),
{stop, normal, ok, State}.
handle_call({add_buffer, Buffer}, _From, State = #state{buffers = Buffers}) ->
{reply, ok, State#state{buffers = [Buffer| Buffers]}};
handle_call({clear_buffers}, _From, State) ->
{reply, ok, State#state{buffers = []}}.

-spec handle_info(any(), state()) -> {noreply, state()}.
handle_info({connected, Socket}, State) ->
{reply, ok, NewState} = handle_flush(State#state{socket = Socket}),
{noreply, NewState};
handle_flush(State#state{socket = Socket});
handle_info(connection_timeout, State) ->
{stop, {error, unable_to_connect}, State};
handle_info({tcp_closed, _Socket}, State = #state{address = {Host, Port}}) ->
Expand All @@ -86,6 +81,9 @@ handle_info({tcp_closed, _Socket}, State = #state{address = {Host, Port}}) ->
handle_info({tcp, _Socket, Bin}, State) ->
NewState = handle_tcp_data(Bin, State),
{noreply, NewState};
handle_info({flush, Time}, State) ->
{ok, _Tref} = queue_flush(Time),
handle_flush(State);
handle_info(Msg, State) ->
lager:notice("unexpected info message received: ~p on ~p", [Msg, State]),
{noreply, State}.
Expand All @@ -105,15 +103,21 @@ init([Id, Connector, Address, Config, Name]) ->
Schema = [{tcp_options, [any], {default, []}},
{retry_interval, positive_integer, {default, 1000}},
{max_retries, positive_integer, {default, 3}},
{client_id, binary, {default, <<"kafkerl_client">>}}],
{client_id, binary, {default, <<"kafkerl_client">>}},
{max_time_queued, positive_integer, {default, 30}}],
case normalizerl:normalize_proplist(Schema, Config) of
{ok, [TCPOpts, RetryInterval, MaxRetries, ClientId]} ->
{ok, [TCPOpts, RetryInterval, MaxRetries, ClientId, MaxTimeQueued]} ->
NewTCPOpts = kafkerl_utils:get_tcp_options(TCPOpts),
EtsName = list_to_atom(atom_to_list(Name) ++ "_ets"),
ets:new(EtsName, [named_table, public, {write_concurrency, true},
{read_concurrency, true}]),
State = #state{conn_idx = Id, tcp_options = NewTCPOpts, address = Address,
max_retries = MaxRetries, retry_interval = RetryInterval,
connector = Connector, client_id = ClientId, name = Name},
connector = Connector, client_id = ClientId, name = Name,
max_time_queued = MaxTimeQueued, ets = EtsName},
Params = [self(), NewTCPOpts, Address, RetryInterval, MaxRetries],
_Pid = spawn_link(?MODULE, connect, Params),
{ok, _Tref} = queue_flush(MaxTimeQueued),
{ok, State};
{errors, Errors} ->
lists:foreach(fun(E) ->
Expand All @@ -122,34 +126,31 @@ init([Id, Connector, Address, Config, Name]) ->
{stop, bad_config}
end.

handle_send(Message, State = #state{conn_idx = ConnIdx}) ->
case kafkerl_buffer:buffer(ConnIdx, Message) of
{ok, true = _ShouldFlush} ->
handle_flush(State);
{ok, false = _ShouldFlush} ->
{reply, ok, State}
end.

handle_flush(State = #state{socket = undefined}) ->
{reply, ok, State};
handle_flush(State = #state{socket = Socket, conn_idx = ConnIdx,
client_id = ClientId}) ->
{noreply, State};
handle_flush(State = #state{socket = Socket, conn_idx = ConnIdx, ets = EtsName,
client_id = ClientId, buffers = Buffers}) ->
{ok, CorrelationId, NewState} = build_correlation_id(State),
BuiltRequest = kafkerl_buffer:build_request(ConnIdx, ClientId, CorrelationId,
?COMPRESSION_NONE),
case BuiltRequest of
{ok, void} ->
{reply, ok, NewState};
{ok, IOList} ->
lager:debug("Sending ~p", [IOList]),
case gen_tcp:send(Socket, IOList) of
% TODO: Maybe buffer all this messages in case something goes wrong
AllMessages = get_all_messages(Buffers),
case kafkerl_utils:merge_messages(AllMessages) of
[] ->
{noreply, NewState};
MergedMessages ->
Request = kafkerl_protocol:build_produce_request(MergedMessages,
ClientId,
CorrelationId,
?COMPRESSION_NONE),
true = ets:insert_new(EtsName, {CorrelationId, MergedMessages}),
lager:debug("Sending ~p", [Request]),
case gen_tcp:send(Socket, Request) of
{error, Reason} ->
lager:warning("unable to write to socket, reason: ~p", [Reason]),
gen_tcp:close(Socket),
{reply, ok, handle_tcp_close(NewState)};
{noreply, handle_tcp_close(NewState)};
ok ->
lager:debug("message ~p sent", [CorrelationId]),
{reply, ok, NewState}
{noreply, NewState}
end
end.

Expand All @@ -162,29 +163,24 @@ handle_tcp_close(State = #state{retry_interval = RetryInterval,
_Pid = spawn_link(?MODULE, connect, Params),
State#state{socket = undefined}.

handle_tcp_data(Bin, State = #state{connector = Connector}) ->
handle_tcp_data(Bin, State = #state{connector = Connector, ets = EtsName}) ->
case kafkerl_protocol:parse_produce_response(Bin) of
{ok, CorrelationId, Topics} ->
case kafkerl_buffer:delete_saved_request(CorrelationId) of
{error, Reason} ->
lager:error("Unable to retrieve the saved request #~p, reason: ~p",
[CorrelationId, Reason]),
[{CorrelationId, Messages}] = ets:lookup(EtsName, CorrelationId),
ets:delete(EtsName, CorrelationId),
{Errors, Successes} = separate_errors(Topics),
% First, send the offsets and messages that were delivered
spawn(fun() ->
notify_success_to_connector(Successes, Messages, Connector)
end),
case handle_errors(Errors, Messages) of
ignore ->
State;
{ok, Messages} ->
{Errors, Successes} = separate_errors(Topics),
% First, send the offsets and messages that were delivered
spawn(fun() ->
notify_success_to_connector(Successes, Messages, Connector)
end),
case handle_errors(Errors, Messages) of
ignore ->
State;
{request_metadata, MessagesToResend} ->
kafkerl_connector:request_metadata(Connector),
F = fun(M) -> kafkerl_connector:send(Connector, M) end,
ok = lists:foreach(F, MessagesToResend),
State
end
{request_metadata, MessagesToResend} ->
kafkerl_connector:request_metadata(Connector),
F = fun(M) -> kafkerl_connector:send(Connector, M) end,
ok = lists:foreach(F, MessagesToResend),
State
end;
Other ->
lager:critical("unexpected response when parsing message: ~p", [Other]),
Expand Down Expand Up @@ -289,4 +285,15 @@ connect(Pid, TCPOpts, {Host, Port} = Address, Timeout, Retries) ->
[Host, Port, Reason, NewRetries]),
timer:sleep(Timeout),
connect(Pid, TCPOpts, Address, Timeout, NewRetries)
end.
end.

queue_flush(Time) ->
timer:send_after(Time * 1000, {flush, Time}).

get_all_messages(Buffers) ->
get_all_messages(Buffers, []).

get_all_messages([], Acc) ->
Acc;
get_all_messages([H | T], Acc) ->
get_all_messages(T, Acc ++ ets_buffer:read_all(H)).

0 comments on commit 77dceb1

Please sign in to comment.