diff --git a/.gitignore b/.gitignore index 95fe271..9b09230 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ log *.log *.bak rebar +.rebar diff --git a/rel/kafkerl.app.config b/rel/kafkerl.app.config index f211301..3f93e90 100644 --- a/rel/kafkerl.app.config +++ b/rel/kafkerl.app.config @@ -3,12 +3,13 @@ {kafkerl, [%{gen_server_name, kafkerl_client}, {disabled, false}, {conn_config, [{brokers, [{"localhost", 9092}]}, - {client_id, kafkerl_client}, - {flush_every, 60}, + {client_id, kafkerl_client}, % Sent to kafka {max_broker_retries, 2}, {broker_tcp_timeout, 1000}, {max_metadata_retries, -1}, {retry_on_topic_error, true}, - {metadata_tcp_timeout, 1000}]}, - {topics, [test1, test2, test3]}, - {buffer, [{max_queue_size, 4}]}]}]. \ No newline at end of file + {metadata_tcp_timeout, 1000}, + {max_queue_size, 20}, % In items, per topic/partition + {max_time_queued, 30} % In seconds + ]}, + {topics, [test1, test2, test3]}]}]. \ No newline at end of file diff --git a/src/kafkerl.erl b/src/kafkerl.erl index 31f39ce..333b84b 100644 --- a/src/kafkerl.erl +++ b/src/kafkerl.erl @@ -52,7 +52,6 @@ subscribe(Name, Callback) -> subscribe(Name, Callback, Filter) -> kafkerl_connector:subscribe(Name, Callback, Filter). - -spec unsubscribe(callback()) -> ok. unsubscribe(Callback) -> unsubscribe(?MODULE, Callback). diff --git a/src/kafkerl_broker_connection.erl b/src/kafkerl_broker_connection.erl index 3d93422..a6c6fb2 100644 --- a/src/kafkerl_broker_connection.erl +++ b/src/kafkerl_broker_connection.erl @@ -4,7 +4,7 @@ -behaviour(gen_server). %% API --export([send/2, flush/1, kill/1]). +-export([add_buffer/2, clear_buffers/1]). % Only for internal use -export([connect/5]). % Supervisors @@ -20,6 +20,7 @@ -type start_link_response() :: {ok, atom(), pid()} | ignore | {error, any()}. -record(state, {name = undefined :: atom(), + buffers = [] :: [atom()], conn_idx = undefined :: conn_idx(), client_id = undefined :: binary(), socket = undefined :: undefined | port(), @@ -30,7 +31,9 @@ max_retries = 0 :: integer(), retry_interval = 0 :: integer(), request_number = 0 :: integer(), - pending_requests = [] :: [integer()]}). + pending_requests = [] :: [integer()], + max_time_queued = 0 :: integer(), + ets = undefined :: atom()}). -type state() :: #state{}. %%============================================================================== @@ -39,7 +42,8 @@ -spec start_link(conn_idx(), pid(), socket_address(), any()) -> start_link_response(). start_link(Id, Connector, Address, Config) -> - Name = list_to_atom(atom_to_list(?MODULE) ++ "_" ++ integer_to_list(Id)), + NameStr = atom_to_list(?MODULE) ++ "_" ++ integer_to_list(Id), + Name = list_to_atom(NameStr), Params = [Id, Connector, Address, Config, Name], case gen_server:start_link({local, Name}, ?MODULE, Params, []) of {ok, Pid} -> @@ -48,35 +52,26 @@ start_link(Id, Connector, Address, Config) -> Other end. --spec send(server_ref(), basic_message()) -> ok. -send(ServerRef, Message) -> - gen_server:call(ServerRef, {send, Message}). +-spec add_buffer(server_ref(), atom()) -> ok. +add_buffer(ServerRef, Buffer) -> + gen_server:call(ServerRef, {add_buffer, Buffer}). --spec flush(server_ref()) -> ok. -flush(ServerRef) -> - gen_server:call(ServerRef, {flush}). - --spec kill(server_ref()) -> ok. -kill(ServerRef) -> - gen_server:call(ServerRef, {kill}). +-spec clear_buffers(server_ref()) -> ok. +clear_buffers(ServerRef) -> + gen_server:call(ServerRef, {clear_buffers}). %%============================================================================== %% gen_server callbacks %%============================================================================== -spec handle_call(any(), any(), state()) -> {reply, ok, state()}. -handle_call({send, Message}, _From, State) -> - handle_send(Message, State); -handle_call({flush}, _From, State) -> - handle_flush(State); -handle_call({kill}, _From, State = #state{name = Name}) -> - % TODO: handle the potentially buffered messages - lager:info("~p stopped by it's parent connector", [Name]), - {stop, normal, ok, State}. +handle_call({add_buffer, Buffer}, _From, State = #state{buffers = Buffers}) -> + {reply, ok, State#state{buffers = [Buffer| Buffers]}}; +handle_call({clear_buffers}, _From, State) -> + {reply, ok, State#state{buffers = []}}. -spec handle_info(any(), state()) -> {noreply, state()}. handle_info({connected, Socket}, State) -> - {reply, ok, NewState} = handle_flush(State#state{socket = Socket}), - {noreply, NewState}; + handle_flush(State#state{socket = Socket}); handle_info(connection_timeout, State) -> {stop, {error, unable_to_connect}, State}; handle_info({tcp_closed, _Socket}, State = #state{address = {Host, Port}}) -> @@ -86,6 +81,9 @@ handle_info({tcp_closed, _Socket}, State = #state{address = {Host, Port}}) -> handle_info({tcp, _Socket, Bin}, State) -> NewState = handle_tcp_data(Bin, State), {noreply, NewState}; +handle_info({flush, Time}, State) -> + {ok, _Tref} = queue_flush(Time), + handle_flush(State); handle_info(Msg, State) -> lager:notice("unexpected info message received: ~p on ~p", [Msg, State]), {noreply, State}. @@ -105,15 +103,21 @@ init([Id, Connector, Address, Config, Name]) -> Schema = [{tcp_options, [any], {default, []}}, {retry_interval, positive_integer, {default, 1000}}, {max_retries, positive_integer, {default, 3}}, - {client_id, binary, {default, <<"kafkerl_client">>}}], + {client_id, binary, {default, <<"kafkerl_client">>}}, + {max_time_queued, positive_integer, {default, 30}}], case normalizerl:normalize_proplist(Schema, Config) of - {ok, [TCPOpts, RetryInterval, MaxRetries, ClientId]} -> + {ok, [TCPOpts, RetryInterval, MaxRetries, ClientId, MaxTimeQueued]} -> NewTCPOpts = kafkerl_utils:get_tcp_options(TCPOpts), + EtsName = list_to_atom(atom_to_list(Name) ++ "_ets"), + ets:new(EtsName, [named_table, public, {write_concurrency, true}, + {read_concurrency, true}]), State = #state{conn_idx = Id, tcp_options = NewTCPOpts, address = Address, max_retries = MaxRetries, retry_interval = RetryInterval, - connector = Connector, client_id = ClientId, name = Name}, + connector = Connector, client_id = ClientId, name = Name, + max_time_queued = MaxTimeQueued, ets = EtsName}, Params = [self(), NewTCPOpts, Address, RetryInterval, MaxRetries], _Pid = spawn_link(?MODULE, connect, Params), + {ok, _Tref} = queue_flush(MaxTimeQueued), {ok, State}; {errors, Errors} -> lists:foreach(fun(E) -> @@ -122,34 +126,31 @@ init([Id, Connector, Address, Config, Name]) -> {stop, bad_config} end. -handle_send(Message, State = #state{conn_idx = ConnIdx}) -> - case kafkerl_buffer:buffer(ConnIdx, Message) of - {ok, true = _ShouldFlush} -> - handle_flush(State); - {ok, false = _ShouldFlush} -> - {reply, ok, State} - end. - handle_flush(State = #state{socket = undefined}) -> - {reply, ok, State}; -handle_flush(State = #state{socket = Socket, conn_idx = ConnIdx, - client_id = ClientId}) -> + {noreply, State}; +handle_flush(State = #state{socket = Socket, conn_idx = ConnIdx, ets = EtsName, + client_id = ClientId, buffers = Buffers}) -> {ok, CorrelationId, NewState} = build_correlation_id(State), - BuiltRequest = kafkerl_buffer:build_request(ConnIdx, ClientId, CorrelationId, - ?COMPRESSION_NONE), - case BuiltRequest of - {ok, void} -> - {reply, ok, NewState}; - {ok, IOList} -> - lager:debug("Sending ~p", [IOList]), - case gen_tcp:send(Socket, IOList) of + % TODO: Maybe buffer all this messages in case something goes wrong + AllMessages = get_all_messages(Buffers), + case kafkerl_utils:merge_messages(AllMessages) of + [] -> + {noreply, NewState}; + MergedMessages -> + Request = kafkerl_protocol:build_produce_request(MergedMessages, + ClientId, + CorrelationId, + ?COMPRESSION_NONE), + true = ets:insert_new(EtsName, {CorrelationId, MergedMessages}), + lager:debug("Sending ~p", [Request]), + case gen_tcp:send(Socket, Request) of {error, Reason} -> lager:warning("unable to write to socket, reason: ~p", [Reason]), gen_tcp:close(Socket), - {reply, ok, handle_tcp_close(NewState)}; + {noreply, handle_tcp_close(NewState)}; ok -> lager:debug("message ~p sent", [CorrelationId]), - {reply, ok, NewState} + {noreply, NewState} end end. @@ -162,29 +163,24 @@ handle_tcp_close(State = #state{retry_interval = RetryInterval, _Pid = spawn_link(?MODULE, connect, Params), State#state{socket = undefined}. -handle_tcp_data(Bin, State = #state{connector = Connector}) -> +handle_tcp_data(Bin, State = #state{connector = Connector, ets = EtsName}) -> case kafkerl_protocol:parse_produce_response(Bin) of {ok, CorrelationId, Topics} -> - case kafkerl_buffer:delete_saved_request(CorrelationId) of - {error, Reason} -> - lager:error("Unable to retrieve the saved request #~p, reason: ~p", - [CorrelationId, Reason]), + [{CorrelationId, Messages}] = ets:lookup(EtsName, CorrelationId), + ets:delete(EtsName, CorrelationId), + {Errors, Successes} = separate_errors(Topics), + % First, send the offsets and messages that were delivered + spawn(fun() -> + notify_success_to_connector(Successes, Messages, Connector) + end), + case handle_errors(Errors, Messages) of + ignore -> State; - {ok, Messages} -> - {Errors, Successes} = separate_errors(Topics), - % First, send the offsets and messages that were delivered - spawn(fun() -> - notify_success_to_connector(Successes, Messages, Connector) - end), - case handle_errors(Errors, Messages) of - ignore -> - State; - {request_metadata, MessagesToResend} -> - kafkerl_connector:request_metadata(Connector), - F = fun(M) -> kafkerl_connector:send(Connector, M) end, - ok = lists:foreach(F, MessagesToResend), - State - end + {request_metadata, MessagesToResend} -> + kafkerl_connector:request_metadata(Connector), + F = fun(M) -> kafkerl_connector:send(Connector, M) end, + ok = lists:foreach(F, MessagesToResend), + State end; Other -> lager:critical("unexpected response when parsing message: ~p", [Other]), @@ -289,4 +285,15 @@ connect(Pid, TCPOpts, {Host, Port} = Address, Timeout, Retries) -> [Host, Port, Reason, NewRetries]), timer:sleep(Timeout), connect(Pid, TCPOpts, Address, Timeout, NewRetries) - end. \ No newline at end of file + end. + +queue_flush(Time) -> + timer:send_after(Time * 1000, {flush, Time}). + +get_all_messages(Buffers) -> + get_all_messages(Buffers, []). + +get_all_messages([], Acc) -> + Acc; +get_all_messages([H | T], Acc) -> + get_all_messages(T, Acc ++ ets_buffer:read_all(H)). \ No newline at end of file diff --git a/src/kafkerl_buffer.erl b/src/kafkerl_buffer.erl deleted file mode 100644 index cb0dda6..0000000 --- a/src/kafkerl_buffer.erl +++ /dev/null @@ -1,197 +0,0 @@ --module(kafkerl_buffer). --author('hernanrivasacosta@gmail.com'). - --behaviour(gen_server). - -%% API --export([buffer/2, build_request/4]). --export([get_saved_request/1, delete_saved_request/1]). -% Supervisors --export([start_link/1]). -% gen_server callbacks --export([init/1, terminate/2, code_change/3, - handle_call/3, handle_cast/2, handle_info/2]). - --include("kafkerl.hrl"). - --type start_link_response() :: {ok, pid()} | ignore | error(). - --record(state, {max_queue_size = 0 :: non_neg_integer(), - message_buffer = [] :: [{integer(), basic_message()}], - saved_requests = [] :: [{correlation_id(), - iodata(), - [basic_message()]}]}). --type state() :: #state{}. - -%%============================================================================== -%% API -%%============================================================================== --spec start_link(any()) -> start_link_response(). -start_link(Config) -> - gen_server:start_link({local, ?ETS_BUFFER}, ?MODULE, [Config], []). - -% Returns true if the buffer should be flushed --spec buffer(atom(), basic_message()) -> {ok, boolean()}. -buffer(Broker, Message) -> - gen_server:call(?ETS_BUFFER, {buffer, Broker, Message}). - --spec build_request(atom(), client_id(), correlation_id(), compression()) -> - {ok, iodata() | void}. -build_request(Broker, ClientId, CorrelationId, Compression) -> - case gen_server:call(?ETS_BUFFER, {get_buffer, Broker}) of - {ok, []} -> - {ok, void}; - {ok, Messages} -> - MergedMessages = merge_topics(Messages), - Request = kafkerl_protocol:build_produce_request(MergedMessages, - ClientId, - CorrelationId, - Compression), - SavedRequest = {CorrelationId, Request, Messages}, - ok = gen_server:call(?ETS_BUFFER, {save_request, SavedRequest}), - {ok, Request} - end. - --spec get_saved_request(correlation_id()) -> - {ok, iodata(), [basic_message()]} | {error, not_found}. -get_saved_request(CorrelationId) -> - gen_server:call(?ETS_BUFFER, {get_saved_request, CorrelationId}). - --spec delete_saved_request(correlation_id()) -> {ok, [basic_message()]} | - {error, not_found}. -delete_saved_request(CorrelationId) -> - gen_server:call(?ETS_BUFFER, {delete_saved_request, CorrelationId}). - -% gen_server callbacks --spec handle_call(any(), any(), state()) -> - {reply, any(), state()}. -handle_call({buffer, Broker, Message}, _From, State) -> - handle_buffer(Broker, Message, State); -handle_call({get_buffer, Broker}, _From, State) -> - handle_get_buffer(Broker, State); -handle_call({save_request, Request}, _From, State) -> - handle_save_request(Request, State); -handle_call({get_saved_request, CorrelationId}, _From, State) -> - handle_get_saved_request(CorrelationId, State); -handle_call({delete_saved_request, CorrelationId}, _From, State) -> - handle_delete_saved_request(CorrelationId, State). - -% Boilerplate --spec handle_info(any(), state()) -> {noreply, state()}. -handle_info(_Msg, State) -> {noreply, State}. --spec handle_cast(any(), state()) -> {noreply, state()}. -handle_cast(_Msg, State) -> {noreply, State}. --spec terminate(atom(), state()) -> ok. -terminate(_Reason, _State) -> ok. --spec code_change(string(), state(), any()) -> {ok, state()}. -code_change(_OldVsn, State, _Extra) -> {ok, State}. - -%%============================================================================== -%% Handlers -%%============================================================================== -init([Config]) -> - Schema = [{max_queue_size, non_neg_integer, {default, 10}}], - case normalizerl:normalize_proplist(Schema, Config) of - {ok, [MaxQueueSize]} -> - {ok, #state{max_queue_size = MaxQueueSize}}; - {errors, Errors} -> - lists:foreach(fun(E) -> - lager:critical("cache config error ~p", [E]) - end, Errors), - {stop, bad_config} - end. - -handle_buffer(Broker, Message, State = #state{message_buffer = Messages, - max_queue_size = MaxQueueSize}) -> - FormattedMessage = format_message(Message), - F = fun(undefined) -> {1, [FormattedMessage]}; - (L) -> {length(L) + 1, [FormattedMessage | L]} end, - {NewMessagesCount, NewMessages} = update_proplist(Broker, F, Messages), - ShouldFlush = NewMessagesCount >= MaxQueueSize, - {reply, {ok, ShouldFlush}, State#state{message_buffer = NewMessages}}. - -handle_get_buffer(Broker, State = #state{message_buffer = Messages}) -> - case lists:keytake(Broker, 1, Messages) of - false -> - {reply, {ok, []}, State}; - {value, {_, Value}, NewMessages} -> - NewState = State#state{message_buffer = NewMessages}, - {reply, {ok, lists:reverse(Value)}, NewState} - end. - -handle_save_request(Request, State = #state{saved_requests = Requests}) -> - {reply, ok, State#state{saved_requests = [Request | Requests]}}. - -handle_get_saved_request(CorrelationId, State) -> - Response = case lists:keyfind(CorrelationId, 1, State#state.message_buffer) of - {CorrelationId, Request, Messages} -> - {ok, {Request, Messages}}; - _ -> - {error, not_found} - end, - {reply, Response, State}. - -handle_delete_saved_request(CorrelationId, - State = #state{saved_requests = Requests}) -> - case lists:keytake(CorrelationId, 1, Requests) of - false -> - {reply, {error, not_found}, State}; - {value, {CorrelationId, _Bin, Messages}, NewMessageBuffer} -> - {reply, {ok, Messages}, State#state{message_buffer = NewMessageBuffer}} - end. - -%%============================================================================== -%% Utils -%%============================================================================== -format_message({Topic, Partitions}) when is_list(Partitions) -> - {Topic, Partitions}; -format_message({Topic, Partition}) -> - {Topic, [Partition]}; -format_message({Topic, Partition, Messages}) when is_list(Messages) -> - {Topic, [{Partition, Messages}]}; -format_message({Topic, Partition, Message}) -> - {Topic, [{Partition, [Message]}]}. - -update_proplist(Key, Fun, Proplist) -> - update_proplist(Key, Fun, Proplist, []). -update_proplist(Key, Fun, [], Acc) -> - {Count, List} = Fun(undefined), - {Count, lists:reverse([{Key, List} | Acc])}; -update_proplist(Key, Fun, [{Key, Value} | T], Acc) -> - {Count, List} = Fun(Value), - {Count, lists:reverse([{Key, List} | Acc], T)}; -update_proplist(Key, Fun, [H | T], Acc) -> - update_proplist(Key, Fun, T, [H | Acc]). - -merge_topics(Topics) -> - merge_topics(Topics, []). -merge_topics([], Acc) -> - Acc; -merge_topics([H | T], Acc) -> - merge_topics(T, merge_topic(H, Acc)). - -merge_topic(Topic, Topics) -> - merge_topic(Topic, Topics, []). -merge_topic(Topic, [], Acc) -> - lists:reverse([Topic | Acc]); -merge_topic({Topic, NewPartitions}, [{Topic, Partitions} | T], Acc) -> - MergedPartitions = merge_partitions(NewPartitions ++ Partitions), - lists:reverse(Acc, [{Topic, MergedPartitions} | T]); -merge_topic(Topic, [H | T], Acc) -> - merge_topic(Topic, T, [H | Acc]). - -merge_partitions(Partitions) -> - merge_partitions(Partitions, []). -merge_partitions([], Acc) -> - Acc; -merge_partitions([H | T], Acc) -> - merge_partitions(T, merge_partition(H, Acc)). - -merge_partition(Partition, Partitions) -> - merge_partition(Partition, Partitions, []). -merge_partition(Partition, [], Acc) -> - lists:reverse([Partition | Acc]); -merge_partition({Partition, NewMessages}, [{Partition, Messages} | T], Acc) -> - lists:reverse(Acc, [{Partition, NewMessages ++ Messages} | T]); -merge_partition(Topic, [H | T], Acc) -> - merge_partition(Topic, T, [H | Acc]). \ No newline at end of file diff --git a/src/kafkerl_connector.erl b/src/kafkerl_connector.erl index 37195f7..42d5917 100644 --- a/src/kafkerl_connector.erl +++ b/src/kafkerl_connector.erl @@ -46,8 +46,15 @@ start_link(Name, Config) -> send(ServerRef, Message) -> send(ServerRef, Message, 1000). -spec send(server_ref(), basic_message(), integer() | infinity) -> ok | error(). -send(ServerRef, Message, Timeout) -> - gen_server:call(ServerRef, {send, Message}, Timeout). +send(ServerRef, {Topic, Partition, Payload} = Message, Timeout) -> + Buffer = kafkerl_utils:buffer_name(Topic, Partition), + case ets_buffer:write(Buffer, Message) of + NewCount when is_integer(NewCount) -> + ok; + Error -> + lager:debug("unable to send message to ~p, reason: ~p", [Buffer, Error]), + gen_server:call(ServerRef, {send, Message}, Timeout) + end. -spec get_partitions(server_ref()) -> [{topic(), [partition()]}] | error(). get_partitions(ServerRef) -> @@ -109,7 +116,7 @@ handle_info(metadata_timeout, State) -> handle_info({metadata_updated, []}, State) -> % If the metadata arrived empty request it again {noreply, handle_request_metadata(State, [])}; -handle_info({metadata_updated, Mapping}, State = #state{pending = Pending}) -> +handle_info({metadata_updated, Mapping}, State) -> % Create the topic mapping (this also starts the broker connections) NewBrokerMapping = get_broker_mapping(Mapping, State), lager:debug("Refreshed topic mapping: ~p", [NewBrokerMapping]), @@ -121,13 +128,12 @@ handle_info({metadata_updated, Mapping}, State = #state{pending = Pending}) -> NewTopics = lists:sort([T || {T, _P} <- PartitionData]), NewKnownTopics = lists:umerge(NewTopics, State#state.known_topics), lager:debug("Known topics: ~p", [NewKnownTopics]), - NewState = State#state{broker_mapping = NewBrokerMapping, - callbacks = NewCallbacks, - known_topics = NewKnownTopics, - pending = []}, - F = fun(P) -> handle_send(P, NewState) end, - ok = lists:foreach(F, Pending), - {noreply, NewState}; + % Reverse the pending messages and try to send them again + RPending = lists:reverse(State#state.pending), + ok = lists:foreach(fun(P) -> send(self(), P) end, RPending), + {noreply, State#state{broker_mapping = NewBrokerMapping, pending = [], + callbacks = NewCallbacks, + known_topics = NewKnownTopics}}; handle_info(Msg, State) -> lager:notice("Unexpected info message received: ~p on ~p", [Msg, State]), {noreply, State}. @@ -177,41 +183,40 @@ init([Config]) -> {stop, bad_config} end. +handle_send(Message, State = #state{retry_on_topic_error = false}) -> + % The topic didn't exist, ignore + {Topic, _Partition, Payload} = Message, + lager:error("Dropping ~p sent to non existing topic ~p", [Payload, Topic]), + {reply, ok, State}; handle_send(Message, State = #state{broker_mapping = void, pending = Pending}) -> - % If we are waiting for the metadata, just save the message and move on - % TODO: Using the buffer instead of a list in the gen_server will be safer + % Maybe have a new buffer {reply, ok, State#state{pending = [Message | Pending]}}; -handle_send(Message, State = #state{broker_mapping = Mapping, - known_topics = KnownTopics, - retry_on_topic_error = RetryTopics}) -> +handle_send(Message, State = #state{broker_mapping = Mapping, pending = Pending, + known_topics = KnownTopics}) -> {Topic, Partition, Payload} = Message, - _ = case {lists:keyfind({Topic, Partition}, 1, Mapping), RetryTopics} of - {false, false} -> - % When retry topics is false, just fail - lager:error("Dropping ~p sent to non existing topic ~p", - [Payload, Topic]), - {error, invalid_topic}; - {false, true} -> - % We need to check if the topic is valid, if it is and the partition - % is invalid, then retrying is not going to work. Otherwise, we just - % send the message to any broker as this will later on trigger a new - % metadata request. There might be better ways of handling this, but - % you should not be constantly sending messages to new topics anyway - case lists:any(fun({{T, _}, _}) -> T =:= Topic end, Mapping) of - true -> - lager:error("Dropping ~p sent to topic ~p, partition ~p", - [Payload, Topic, Partition]), - {error, invalid_topic}; - false -> - [{_, Broker} | _] = Mapping, - kafkerl_broker_connection:send(Broker, Message) - end; - {{_, Broker}, _} -> - kafkerl_broker_connection:send(Broker, Message) - end, - NewKnownTopics = lists:umerge([Topic], KnownTopics), - {reply, ok, State#state{known_topics = NewKnownTopics}}. + case lists:any(fun({K, _}) -> K =:= {Topic, Partition} end, Mapping) of + true -> + % We need to check if the topic/partition pair exists, this is because the + % ets takes some time to start, so some messages could be lost. + % Therefore if we have the topic/partition, just send it again (the order + % will suffer though) + {reply, send(self(), Message), State}; + false -> + % Now, if the topic/partition was not valid, we need to check if the topic + % exists, if it does, just drop the message as we can assume no partitions + % are created. + case lists:any(fun({{T, _}, _}) -> T =:= Topic end, Mapping) of + true -> + lager:error("Dropping ~p sent to topic ~p, partition ~p", + [Payload, Topic, Partition]), + {reply, ok, State}; + false -> + NewKnownTopics = lists:umerge([Topic], KnownTopics), + NewState = State#state{pending = [Message | Pending]}, + {reply, ok, handle_request_metadata(NewState, NewKnownTopics)} + end + end. handle_get_partitions(#state{broker_mapping = void}) -> {error, not_available}; @@ -359,12 +364,19 @@ get_broker_mapping([], _State, _N, Acc) -> [{Key, Address} || {_ConnId, Key, Address} <- Acc]; get_broker_mapping([{{Topic, Partition, ConnId}, Address} | T], State = #state{config = Config}, N, Acc) -> + Buffer = kafkerl_utils:buffer_name(Topic, Partition), + _ = ets_buffer:create(Buffer, fifo), {Conn, NewN} = case lists:keyfind(ConnId, 1, Acc) of false -> {start_broker_connection(N, Address, Config), N + 1}; {ConnId, _, BrokerConnection} -> {BrokerConnection, N} end, + + Buffer = kafkerl_utils:buffer_name(Topic, Partition), + _ = ets_buffer:create(Buffer, fifo), + kafkerl_broker_connection:add_buffer(Conn, Buffer), + NewMapping = {ConnId, {Topic, Partition}, Conn}, get_broker_mapping(T, State, NewN, [NewMapping | Acc]). @@ -373,8 +385,8 @@ start_broker_connection(N, Address, Config) -> {ok, Name, _Pid} -> Name; {error, {already_started, Pid}} -> - kafkerl_broker_connection:kill(Pid), - start_broker_connection(N, Address, Config) + kafkerl_broker_connection:clear_buffers(Pid), + Pid end. % This is used to return the available partitions for each topic diff --git a/src/kafkerl_sup.erl b/src/kafkerl_sup.erl index ba8b2a7..b720d83 100644 --- a/src/kafkerl_sup.erl +++ b/src/kafkerl_sup.erl @@ -30,26 +30,14 @@ init([]) -> lager:notice("Kafkerl is disabled, ignoring"), []; false -> - [get_connector_child_spec(), get_buffer_child_spec()] + [get_connector_child_spec()] end, {ok, {{one_for_one, 5, 10}, ChildSpecs}}. get_connector_child_spec() -> - Name = case application:get_env(kafkerl, gen_server_name) of - {ok, Value} -> Value; - _ -> kafkerl - end, + Name = application:get_env(kafkerl, gen_server_name, kafkerl), {ok, ConnConfig} = application:get_env(kafkerl, conn_config), - Topics = case application:get_env(kafkerl, topics) of - {ok, Any} -> Any; - undefined -> [] - end, + Topics = application:get_env(kafkerl, topics, []), Params = [Name, [{topics, Topics} | ConnConfig]], MFA = {kafkerl_connector, start_link, Params}, - {Name, MFA, permanent, 2000, worker, [kafkerl_connector]}. - -get_buffer_child_spec() -> - {ok, BufferConfig} = application:get_env(kafkerl, buffer), - - MFA = {kafkerl_buffer, start_link, [BufferConfig]}, - {?ETS_BUFFER, MFA, permanent, 2000, worker, [kafkerl_buffer]}. \ No newline at end of file + {Name, MFA, permanent, 2000, worker, [kafkerl_connector]}. \ No newline at end of file diff --git a/src/kafkerl_utils.erl b/src/kafkerl_utils.erl index 7cc3d8c..7d28f53 100644 --- a/src/kafkerl_utils.erl +++ b/src/kafkerl_utils.erl @@ -4,6 +4,7 @@ -export([send_event/2, send_error/2]). -export([get_tcp_options/1]). -export([merge_messages/1]). +-export([buffer_name/2]). -include("kafkerl.hrl"). -include("kafkerl_consumers.hrl"). @@ -41,6 +42,11 @@ get_tcp_options(Options) -> % TODO: refactor merge_messages(Topics) -> merge_topics(Topics). +-spec buffer_name(topic(), partition()) -> atom(). +buffer_name(Topic, Partition) -> + Bin = <>, + binary_to_atom(Bin, utf8). + %%============================================================================== %% Utils %%==============================================================================