diff --git a/rel/kafkerl.app.config b/rel/kafkerl.app.config index 1457901..ce4b571 100644 --- a/rel/kafkerl.app.config +++ b/rel/kafkerl.app.config @@ -1,6 +1,6 @@ [{lager, [{colored, true}, {handlers, [{lager_console_backend, [debug,true]}]}]}, - {kafkerl, [{gen_server_name, kafkerl_client}, + {kafkerl, [%{gen_server_name, kafkerl_client}, {conn_config, [{brokers, [{"localhost", 9092}]}, {client_id, kafkerl_client}, {flush_every, 60}, @@ -9,5 +9,5 @@ {max_metadata_retries, -1}, {retry_on_topic_error, true}, {metadata_tcp_timeout, 1000}]}, - {topics, [test1, test2, test3, test4]}, + {topics, [test1, test2, test3]}, {buffer, [{max_queue_size, 4}]}]}]. \ No newline at end of file diff --git a/src/kafkerl.erl b/src/kafkerl.erl index ce6e128..b3f3340 100644 --- a/src/kafkerl.erl +++ b/src/kafkerl.erl @@ -4,7 +4,9 @@ -export([start/0, start/2]). -export([produce/1, produce/2, get_partitions/0, get_partitions/1, - subscribe/1, subscribe/2]). + subscribe/1, subscribe/2, + unsubscribe/1, unsubscribe/2, + request_metadata/0, request_metadata/1, request_metadata/2]). -include("kafkerl.hrl"). -include("kafkerl_consumers.hrl"). @@ -48,4 +50,16 @@ unsubscribe(Callback) -> unsubscribe(?MODULE, Callback). -spec unsubscribe(atom(), callback()) -> ok. unsubscribe(Name, Callback) -> - kafkerl_connector:unsubscribe(Name, Callback). \ No newline at end of file + kafkerl_connector:unsubscribe(Name, Callback). + +-spec request_metadata() -> ok. +request_metadata() -> + request_metadata(?MODULE). +-spec request_metadata(atom() | [topic()]) -> ok. +request_metadata(Name) when is_atom(Name) -> + kafkerl_connector:request_metadata(Name); +request_metadata(Topics) -> + request_metadata(?MODULE, Topics). +-spec request_metadata(atom(), [topic()]) -> ok. +request_metadata(Name, Topics) -> + kafkerl_connector:request_metadata(Name, Topics). \ No newline at end of file diff --git a/src/kafkerl_broker_connection.erl b/src/kafkerl_broker_connection.erl index 9470627..7cf6d76 100644 --- a/src/kafkerl_broker_connection.erl +++ b/src/kafkerl_broker_connection.erl @@ -4,7 +4,7 @@ -behaviour(gen_server). %% API --export([send/2, flush/1]). +-export([send/2, flush/1, kill/1]). % Only for internal use -export([connect/5]). % Supervisors @@ -19,7 +19,8 @@ -type conn_idx() :: 0..1023. -type start_link_response() :: {ok, atom(), pid()} | ignore | {error, any()}. --record(state, {conn_idx = undefined :: conn_idx(), +-record(state, {name = undefined :: atom(), + conn_idx = undefined :: conn_idx(), client_id = undefined :: binary(), socket = undefined :: undefined | port(), address = undefined :: undefined | socket_address(), @@ -38,8 +39,8 @@ -spec start_link(conn_idx(), pid(), socket_address(), any()) -> start_link_response(). start_link(Id, Connector, Address, Config) -> - Params = [Id, Connector, Address, Config], Name = list_to_atom(atom_to_list(?MODULE) ++ "_" ++ integer_to_list(Id)), + Params = [Id, Connector, Address, Config, Name], case gen_server:start_link({local, Name}, ?MODULE, Params, []) of {ok, Pid} -> {ok, Name, Pid}; @@ -55,6 +56,10 @@ send(ServerRef, Message) -> flush(ServerRef) -> gen_server:call(ServerRef, {flush}). +-spec kill(server_ref()) -> ok. +kill(ServerRef) -> + gen_server:call(ServerRef, {kill}). + %%============================================================================== %% gen_server callbacks %%============================================================================== @@ -62,7 +67,11 @@ flush(ServerRef) -> handle_call({send, Message}, _From, State) -> handle_send(Message, State); handle_call({flush}, _From, State) -> - handle_flush(State). + handle_flush(State); +handle_call({kill}, _From, State = #state{name = Name}) -> + % TODO: handle the potentially buffered messages + lager:info("~p stopped by it's parent connector", [Name]), + {stop, normal, ok, State}. -spec handle_info(any(), state()) -> {noreply, state()}. handle_info({connected, Socket}, State) -> @@ -92,7 +101,7 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %%============================================================================== %% Handlers %%============================================================================== -init([Id, Connector, Address, Config]) -> +init([Id, Connector, Address, Config, Name]) -> Schema = [{tcp_options, [any], {default, []}}, {retry_interval, positive_integer, {default, 1000}}, {max_retries, positive_integer, {default, 3}}, @@ -100,9 +109,9 @@ init([Id, Connector, Address, Config]) -> case normalizerl:normalize_proplist(Schema, Config) of {ok, [TCPOpts, RetryInterval, MaxRetries, ClientId]} -> NewTCPOpts = kafkerl_utils:get_tcp_options(TCPOpts), - State = #state{conn_idx = Id, address = Address, connector = Connector, + State = #state{conn_idx = Id, tcp_options = NewTCPOpts, address = Address, max_retries = MaxRetries, retry_interval = RetryInterval, - tcp_options = NewTCPOpts, client_id = ClientId}, + connector = Connector, client_id = ClientId, name = Name}, Params = [self(), NewTCPOpts, Address, RetryInterval, MaxRetries], _Pid = spawn_link(?MODULE, connect, Params), {ok, State}; @@ -164,10 +173,10 @@ handle_tcp_data(Bin, State = #state{connector = Connector}) -> {ok, Messages} -> Errors = get_errors_from_produce_response(Topics), case handle_errors(Errors, Messages) of - {ignore, _} -> + ignore -> State; {request_metadata, MessagesToResend} -> - Connector ! force_metadata_request, + kafkerl_connector:request_metadata(Connector), F = fun(M) -> kafkerl_connector:send(Connector, M) end, ok = lists:foreach(F, MessagesToResend), State @@ -197,38 +206,51 @@ build_correlation_id(State = #state{request_number = RequestNumber, get_errors_from_produce_response(Topics) -> get_errors_from_produce_response(Topics, []). +get_errors_from_produce_response([], Acc) -> + lists:reverse(Acc); get_errors_from_produce_response([{Topic, Partitions} | T], Acc) -> Errors = [{Topic, Partition, Error} || {Partition, Error, _} <- Partitions, Error =/= ?NO_ERROR], get_errors_from_produce_response(T, Acc ++ Errors). -handle_errors([] = Errors, _Messages) -> - {ignore, Errors}; +handle_errors([], _Messages) -> + ignore; handle_errors(Errors, Messages) -> - F = fun(E, Acc) -> handle_error(E, Messages, Acc) end, - lists:foreach(F, {ignore, []}, Errors). + F = fun(E) -> handle_error(E, Messages) end, + case lists:filtermap(F, Errors) of + [] -> ignore; + L -> {request_metadata, L} + end. -handle_error({Topic, Partition, Error}, Messages, {_Status, Acc} = Status) +handle_error({Topic, Partition, Error}, Messages) when Error =:= ?UNKNOWN_TOPIC_OR_PARTITION orelse Error =:= ?LEADER_NOT_AVAILABLE orelse Error =:= ?NOT_LEADER_FOR_PARTITION -> case get_message_for_error(Topic, Partition, Messages) of - undefined -> Status; - Message -> {request_metadata, [Message | Acc]} + undefined -> false; + Message -> {true, Message} end; -handle_error({Topic, Partition, Error}, _Messages, Acc) -> +handle_error({Topic, Partition, Error}, _Messages) -> lager:error("Unable to handle ~p error on topic ~p, partition ~p", [kafkerl_error:get_error_name(Error), Topic, Partition]), - Acc. - -get_message_for_error(Topic, Partition, []) -> - lager:error("no saved message found for error on topic ~p, partition ~p", - [Topic, Partition]), - undefined; -get_message_for_error(Topic, Partition, [{Topic, Partition, _} = H | _T]) -> - H; -get_message_for_error(Topic, Partition, [_Message | T]) -> - get_message_for_error(Topic, Partition, T). + false. + +get_message_for_error(Topic, Partition, SavedMessages) -> + case lists:keyfind(Topic, 1, SavedMessages) of + false -> + lager:error("No saved messages found for topic ~p, partition ~p", + [Topic, Partition]), + undefined; + {Topic, Partitions} -> + case lists:keyfind(Partition, 1, Partitions) of + false -> + lager:error("No saved messages found for topic ~p, partition ~p", + [Topic, Partition]), + undefined; + {Partition, Messages} -> + {Topic, Partition, Messages} + end + end. connect(Pid, _TCPOpts, {Host, Port} = _Address, _Timeout, 0) -> lager:error("Unable to connect to ~p:~p", [Host, Port]), diff --git a/src/kafkerl_buffer.erl b/src/kafkerl_buffer.erl index 7df6189..cb0dda6 100644 --- a/src/kafkerl_buffer.erl +++ b/src/kafkerl_buffer.erl @@ -132,8 +132,8 @@ handle_get_saved_request(CorrelationId, State) -> {reply, Response, State}. handle_delete_saved_request(CorrelationId, - State = #state{message_buffer = MessageBuffer}) -> - case lists:keytake(CorrelationId, 1, MessageBuffer) of + State = #state{saved_requests = Requests}) -> + case lists:keytake(CorrelationId, 1, Requests) of false -> {reply, {error, not_found}, State}; {value, {CorrelationId, _Bin, Messages}, NewMessageBuffer} -> diff --git a/src/kafkerl_connector.erl b/src/kafkerl_connector.erl index d90f19a..19913f7 100644 --- a/src/kafkerl_connector.erl +++ b/src/kafkerl_connector.erl @@ -4,8 +4,8 @@ -behaviour(gen_server). %% API --export([send/2, send/3, request_metadata/1, get_partitions/1, subscribe/2, - unsubscribe/2]). +-export([send/2, send/3, request_metadata/1, request_metadata/2, subscribe/2, + get_partitions/1, unsubscribe/2]). % Only for internal use -export([request_metadata/6]). % Supervisors @@ -26,12 +26,13 @@ -record(state, {brokers = [] :: [socket_address()], broker_mapping = void :: [broker_mapping()] | void, client_id = <<>> :: client_id(), - topics = [] :: [topic()], max_metadata_retries = -1 :: integer(), retry_interval = 1 :: non_neg_integer(), config = [] :: {atom(), any()}, retry_on_topic_error = false :: boolean(), - callbacks = [] :: [callback()]}). + callbacks = [] :: [callback()], + known_topics = [] :: [binary()], + pending = [] :: [basic_message()]}). -type state() :: #state{}. %%============================================================================== @@ -42,7 +43,7 @@ start_link(Name, Config) -> gen_server:start_link({local, Name}, ?MODULE, [Config], []). -spec send(server_ref(), basic_message()) -> ok | error(). -send(ServerRef, Message) when is_atom(ServerRef) -> +send(ServerRef, Message) -> send(ServerRef, Message, 1000). -spec send(server_ref(), basic_message(), integer() | infinity) -> ok | error(). send(ServerRef, Message, Timeout) -> @@ -68,15 +69,21 @@ unsubscribe(ServerRef, Callback) -> request_metadata(ServerRef) -> gen_server:call(ServerRef, {request_metadata}). +-spec request_metadata(server_ref(), [topic()]) -> ok. +request_metadata(ServerRef, Topics) -> + gen_server:call(ServerRef, {request_metadata, Topics}). + %%============================================================================== %% gen_server callbacks %%============================================================================== -spec handle_call(any(), any(), state()) -> {reply, ok, state()} | {reply, {error, any()}, state()}. handle_call({send, Message}, _From, State) -> - {reply, handle_send(Message, State), State}; + handle_send(Message, State); handle_call({request_metadata}, _From, State) -> - {reply, ok, handle_request_metadata(State)}; + {reply, ok, handle_request_metadata(State, [])}; +handle_call({request_metadata, Topics}, _From, State) -> + {reply, ok, handle_request_metadata(State, Topics)}; handle_call({get_partitions}, _From, State) -> {reply, handle_get_partitions(State), State}; handle_call({subscribe, Callback}, _From, State) -> @@ -86,17 +93,30 @@ handle_call({unsubscribe, Callback}, _From, State) -> handle_info(metadata_timeout, State) -> {stop, {error, unable_to_retrieve_metadata}, State}; -handle_info({metadata_updated, Mapping}, State) -> - BrokerMapping = get_broker_mapping(Mapping, State), - lager:debug("Refreshed topic mapping: ~p", [BrokerMapping]), - PartitionData = get_partitions_from_mapping(BrokerMapping), +handle_info({metadata_updated, []}, State) -> + % If the metadata arrived empty request it again + {noreply, handle_request_metadata(State, [])}; +handle_info({metadata_updated, Mapping}, State = #state{pending = Pending}) -> + % Create the topic mapping (this also starts the broker connections) + NewBrokerMapping = get_broker_mapping(Mapping, State), + lager:debug("Refreshed topic mapping: ~p", [NewBrokerMapping]), + % Get the partition data to send to the subscribers and send it + PartitionData = get_partitions_from_mapping(NewBrokerMapping), NewCallbacks = lists:filter(fun(Callback) -> kafkerl_utils:send_event(partition_update, Callback, PartitionData) =:= ok end, State#state.callbacks), - NewState = State#state{broker_mapping = BrokerMapping, - callbacks = NewCallbacks}, + % Add to the list of known topics + NewTopics = lists:sort([T || {T, _P} <- PartitionData]), + NewKnownTopics = lists:umerge(NewTopics, State#state.known_topics), + lager:debug("Known topics: ~p", [NewKnownTopics]), + NewState = State#state{broker_mapping = NewBrokerMapping, + callbacks = NewCallbacks, + known_topics = NewKnownTopics, + pending = []}, + F = fun(P) -> handle_send(P, NewState) end, + ok = lists:foreach(F, Pending), {noreply, NewState}; handle_info(Msg, State) -> lager:notice("Unexpected info message received: ~p on ~p", [Msg, State]), @@ -124,13 +144,13 @@ init([Config]) -> {ok, [Brokers, MaxMetadataRetries, ClientId, Topics, RetryInterval, RetryOnTopicError]} -> State = #state{config = Config, - topics = Topics, + known_topics = Topics, brokers = Brokers, client_id = ClientId, retry_interval = RetryInterval, retry_on_topic_error = RetryOnTopicError, max_metadata_retries = MaxMetadataRetries}, - Request = metadata_request(State), + Request = metadata_request(State, Topics), % Start requesting metadata Params = [self(), Brokers, get_metadata_tcp_options(), MaxMetadataRetries, RetryInterval, Request], @@ -143,16 +163,32 @@ init([Config]) -> {stop, bad_config} end. -handle_send(Message, #state{broker_mapping = Mapping}) -> +handle_send(Message, State = #state{broker_mapping = void, + pending = Pending}) -> + % If we are waiting for the metadata, just save the message and move on + % TODO: Using the buffer instead of a list in the gen_server will be safer + {reply, ok, State#state{pending = [Message | Pending]}}; +handle_send(Message, State = #state{broker_mapping = Mapping, + known_topics = KnownTopics, + retry_on_topic_error = RetryTopics}) -> {Topic, Partition, Payload} = Message, - case lists:keyfind({Topic, Partition}, 1, Mapping) of - false -> - lager:error("Dropping ~p sent to topic ~p, partition ~p, reason: ~p", - [Payload, Topic, Partition, no_broker]), - {error, invalid_topic_or_partition}; - {_, Broker} -> - kafkerl_broker_connection:send(Broker, Message) - end. + ok = case {lists:keyfind({Topic, Partition}, 1, Mapping), RetryTopics} of + {false, false} -> + % When retry topics is false, just fail + lager:error("Dropping ~p sent to topic ~p, partition ~p, reason: ~p", + [Payload, Topic, Partition, no_broker]), + {error, invalid_topic_or_partition}; + {false, true} -> + % Send the message to any broker, this will eventually trigger a new + % metadata request, there might be better ways of handling this, but + % you should not be constantly sending messages to new topics anyway + [{_, Broker} | _] = Mapping, + kafkerl_broker_connection:send(Broker, Message); + {{_, Broker}, _} -> + kafkerl_broker_connection:send(Broker, Message) + end, + NewKnownTopics = lists:umerge([Topic], KnownTopics), + {reply, ok, State#state{known_topics = NewKnownTopics}}. handle_get_partitions(#state{broker_mapping = void}) -> {error, not_available}; @@ -160,16 +196,17 @@ handle_get_partitions(#state{broker_mapping = Mapping}) -> {ok, Mapping}. % Ignore it if the topic mapping is void, we are already requesting the metadata -handle_request_metadata(State = #state{broker_mapping = void}) -> +handle_request_metadata(State = #state{broker_mapping = void}, _Topics) -> State; -handle_request_metadata(State = #state{brokers = Brokers, - retry_interval = RetryInterval, - max_metadata_retries = MaxRetries}) -> - Request = metadata_request(State), - Params = [self(), Brokers, get_metadata_tcp_options(), MaxRetries, - RetryInterval, Request], +handle_request_metadata(State, NewTopics) -> + SortedNewTopics = lists:sort(NewTopics), + NewKnownTopics = lists:umerge(State#state.known_topics, SortedNewTopics), + Request = metadata_request(State, NewKnownTopics), + Params = [self(), State#state.brokers, get_metadata_tcp_options(), + State#state.max_metadata_retries, State#state.retry_interval, + Request], _Pid = spawn_link(?MODULE, request_metadata, Params), - State#state{broker_mapping = void}. + State#state{broker_mapping = void, known_topics = NewKnownTopics}. %%============================================================================== %% Utils @@ -235,8 +272,12 @@ request_metadata([{Host, Port} = _Broker | T] = _Brokers, TCPOpts, Request) -> %%============================================================================== %% Request building %%============================================================================== -metadata_request(#state{topics = Topics, client_id = ClientId}) -> - kafkerl_protocol:build_metadata_request(Topics, 0, ClientId). +metadata_request(#state{client_id = ClientId}, [] = _NewTopics) -> + kafkerl_protocol:build_metadata_request([], 0, ClientId); +metadata_request(#state{known_topics = KnownTopics, client_id = ClientId}, + NewTopics) -> + AllTopics = lists:umerge(KnownTopics, NewTopics), + kafkerl_protocol:build_metadata_request(AllTopics, 0, ClientId). %%============================================================================== %% Topic/broker mapping @@ -261,7 +302,7 @@ expand_topic({0, Topic, Partitions}) -> expand_topic({Error, Topic, _Partitions}) -> lager:error("Error ~p on metadata for topic ~p", [kafkerl_error:get_error_name(Error), Topic]), - false. + {true, {Topic, []}}. expand_partitions(Metadata) -> expand_partitions(Metadata, []). @@ -297,7 +338,8 @@ start_broker_connection(N, Address, Config) -> {ok, Name, _Pid} -> Name; {error, {already_started, Pid}} -> - Pid + kafkerl_broker_connection:kill(Pid), + start_broker_connection(N, Address, Config) end. % This is used to return the available partitions for each topic diff --git a/src/kafkerl_protocol.erl b/src/kafkerl_protocol.erl index 2d88826..1d9d857 100644 --- a/src/kafkerl_protocol.erl +++ b/src/kafkerl_protocol.erl @@ -106,8 +106,8 @@ build_produce_request({Topic, Partition, Messages}, Compression) -> TopicSize = byte_size(Topic), {Size, MessageSet} = build_message_set(Messages, Compression), {Size + TopicSize + 24, - [<<0:16/unsigned-integer, % RequiredAcks - -1:32/unsigned-integer, % Timeout + [<<-1:16/signed-integer, + -1:32/signed-integer, % Timeout 1:32/unsigned-integer, % TopicCount TopicSize:16/unsigned-integer>>, Topic, @@ -118,14 +118,12 @@ build_produce_request({Topic, Partition, Messages}, Compression) -> build_produce_request(Data, Compression) -> % Build the body of the request with multiple topics/partitions % (Docs at: http://goo.gl/J3C50c) - RequiredAcks = 0, - Timeout = -1, TopicCount = length(Data), {TopicsSize, Topics} = build_topics(Data, Compression), % 10 is the size of the header {TopicsSize + 10, - [<>, Topics]}. @@ -289,6 +287,9 @@ build_fetch_partition({Partition, Offset, MaxBytes}) -> Offset:64/unsigned-integer, MaxBytes:32/unsigned-integer>>}. +build_metadata_request([]) -> + % Builds an empty metadata request that returns all topics and partitions + {4, <<0:32/unsigned-integer>>}; build_metadata_request(Topic) when is_binary(Topic) -> build_metadata_request([Topic]); build_metadata_request(Topics) -> @@ -337,8 +338,8 @@ parse_produced_partitions(Count, <>, Acc) -> - Partition = {Partition, ErrorCode, Offset}, - parse_produced_partitions(Count - 1, Remainder, [Partition | Acc]). + PartitionData = {Partition, ErrorCode, Offset}, + parse_produced_partitions(Count - 1, Remainder, [PartitionData | Acc]). % Parse fetch response (http://goo.gl/eba5z3) diff --git a/src/kafkerl_sup.erl b/src/kafkerl_sup.erl index ad2de4f..240f0e7 100644 --- a/src/kafkerl_sup.erl +++ b/src/kafkerl_sup.erl @@ -34,8 +34,10 @@ get_connector_child_spec() -> _ -> kafkerl end, {ok, ConnConfig} = application:get_env(kafkerl, conn_config), - {ok, Topics} = application:get_env(kafkerl, topics), - + Topics = case application:get_env(kafkerl, topics) of + {ok, Any} -> Any; + undefined -> [] + end, Params = [Name, [{topics, Topics} | ConnConfig]], MFA = {kafkerl_connector, start_link, Params}, {Name, MFA, permanent, 2000, worker, [kafkerl_connector]}.