From 130ccffb1e96c1502e0bb7867faf0cd2c1e9436b Mon Sep 17 00:00:00 2001 From: Hernan Rivas Acosta Date: Thu, 14 Aug 2014 16:12:45 -0300 Subject: [PATCH 1/5] added support for requesting all topics by specifying an empty array --- src/kafkerl_protocol.erl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/kafkerl_protocol.erl b/src/kafkerl_protocol.erl index 2d88826..e4e98c6 100644 --- a/src/kafkerl_protocol.erl +++ b/src/kafkerl_protocol.erl @@ -289,6 +289,9 @@ build_fetch_partition({Partition, Offset, MaxBytes}) -> Offset:64/unsigned-integer, MaxBytes:32/unsigned-integer>>}. +build_metadata_request([]) -> + % Builds an empty metadata request that returns all topics and partitions + {4, <<0:32/unsigned-integer>>}; build_metadata_request(Topic) when is_binary(Topic) -> build_metadata_request([Topic]); build_metadata_request(Topics) -> From d30d1e08f34d5c3f3d4326cc68b5ee2fbb343ac4 Mon Sep 17 00:00:00 2001 From: Hernan Rivas Acosta Date: Thu, 14 Aug 2014 17:59:22 -0300 Subject: [PATCH 2/5] keeping track of the known topics and added a new function to request metadata --- rel/kafkerl.app.config | 4 +-- src/kafkerl.erl | 18 +++++++++-- src/kafkerl_connector.erl | 64 +++++++++++++++++++++++++-------------- 3 files changed, 59 insertions(+), 27 deletions(-) diff --git a/rel/kafkerl.app.config b/rel/kafkerl.app.config index 1457901..ce4b571 100644 --- a/rel/kafkerl.app.config +++ b/rel/kafkerl.app.config @@ -1,6 +1,6 @@ [{lager, [{colored, true}, {handlers, [{lager_console_backend, [debug,true]}]}]}, - {kafkerl, [{gen_server_name, kafkerl_client}, + {kafkerl, [%{gen_server_name, kafkerl_client}, {conn_config, [{brokers, [{"localhost", 9092}]}, {client_id, kafkerl_client}, {flush_every, 60}, @@ -9,5 +9,5 @@ {max_metadata_retries, -1}, {retry_on_topic_error, true}, {metadata_tcp_timeout, 1000}]}, - {topics, [test1, test2, test3, test4]}, + {topics, [test1, test2, test3]}, {buffer, [{max_queue_size, 4}]}]}]. \ No newline at end of file diff --git a/src/kafkerl.erl b/src/kafkerl.erl index ce6e128..b3f3340 100644 --- a/src/kafkerl.erl +++ b/src/kafkerl.erl @@ -4,7 +4,9 @@ -export([start/0, start/2]). -export([produce/1, produce/2, get_partitions/0, get_partitions/1, - subscribe/1, subscribe/2]). + subscribe/1, subscribe/2, + unsubscribe/1, unsubscribe/2, + request_metadata/0, request_metadata/1, request_metadata/2]). -include("kafkerl.hrl"). -include("kafkerl_consumers.hrl"). @@ -48,4 +50,16 @@ unsubscribe(Callback) -> unsubscribe(?MODULE, Callback). -spec unsubscribe(atom(), callback()) -> ok. unsubscribe(Name, Callback) -> - kafkerl_connector:unsubscribe(Name, Callback). \ No newline at end of file + kafkerl_connector:unsubscribe(Name, Callback). + +-spec request_metadata() -> ok. +request_metadata() -> + request_metadata(?MODULE). +-spec request_metadata(atom() | [topic()]) -> ok. +request_metadata(Name) when is_atom(Name) -> + kafkerl_connector:request_metadata(Name); +request_metadata(Topics) -> + request_metadata(?MODULE, Topics). +-spec request_metadata(atom(), [topic()]) -> ok. +request_metadata(Name, Topics) -> + kafkerl_connector:request_metadata(Name, Topics). \ No newline at end of file diff --git a/src/kafkerl_connector.erl b/src/kafkerl_connector.erl index d90f19a..00a9640 100644 --- a/src/kafkerl_connector.erl +++ b/src/kafkerl_connector.erl @@ -4,8 +4,8 @@ -behaviour(gen_server). %% API --export([send/2, send/3, request_metadata/1, get_partitions/1, subscribe/2, - unsubscribe/2]). +-export([send/2, send/3, request_metadata/1, request_metadata/2, subscribe/2, + get_partitions/1, unsubscribe/2]). % Only for internal use -export([request_metadata/6]). % Supervisors @@ -26,12 +26,12 @@ -record(state, {brokers = [] :: [socket_address()], broker_mapping = void :: [broker_mapping()] | void, client_id = <<>> :: client_id(), - topics = [] :: [topic()], max_metadata_retries = -1 :: integer(), retry_interval = 1 :: non_neg_integer(), config = [] :: {atom(), any()}, retry_on_topic_error = false :: boolean(), - callbacks = [] :: [callback()]}). + callbacks = [] :: [callback()], + known_topics = [] :: [binary()]}). -type state() :: #state{}. %%============================================================================== @@ -68,6 +68,10 @@ unsubscribe(ServerRef, Callback) -> request_metadata(ServerRef) -> gen_server:call(ServerRef, {request_metadata}). +-spec request_metadata(server_ref(), [topic()]) -> ok. +request_metadata(ServerRef, Topics) -> + gen_server:call(ServerRef, {request_metadata, Topics}). + %%============================================================================== %% gen_server callbacks %%============================================================================== @@ -76,7 +80,9 @@ request_metadata(ServerRef) -> handle_call({send, Message}, _From, State) -> {reply, handle_send(Message, State), State}; handle_call({request_metadata}, _From, State) -> - {reply, ok, handle_request_metadata(State)}; + {reply, ok, handle_request_metadata(State, [])}; +handle_call({request_metadata, Topics}, _From, State) -> + {reply, ok, handle_request_metadata(State, Topics)}; handle_call({get_partitions}, _From, State) -> {reply, handle_get_partitions(State), State}; handle_call({subscribe, Callback}, _From, State) -> @@ -87,17 +93,24 @@ handle_call({unsubscribe, Callback}, _From, State) -> handle_info(metadata_timeout, State) -> {stop, {error, unable_to_retrieve_metadata}, State}; handle_info({metadata_updated, Mapping}, State) -> - BrokerMapping = get_broker_mapping(Mapping, State), - lager:debug("Refreshed topic mapping: ~p", [BrokerMapping]), - PartitionData = get_partitions_from_mapping(BrokerMapping), + % Create the topic mapping (this also starts the broker connections) + NewBrokerMapping = get_broker_mapping(Mapping, State), + lager:debug("Refreshed topic mapping: ~p", [NewBrokerMapping]), + % Get the partition data to send to the subscribers and send it + PartitionData = get_partitions_from_mapping(NewBrokerMapping), NewCallbacks = lists:filter(fun(Callback) -> kafkerl_utils:send_event(partition_update, Callback, PartitionData) =:= ok end, State#state.callbacks), - NewState = State#state{broker_mapping = BrokerMapping, - callbacks = NewCallbacks}, - {noreply, NewState}; + % Add to the list of known topics + NewTopics = lists:sort([T || {T, _P} <- PartitionData]), + NewKnownTopics = lists:umerge(NewTopics, State#state.known_topics), + lager:debug("Known topics: ~p", [NewKnownTopics]), + {noreply, + State#state{broker_mapping = NewBrokerMapping, + callbacks = NewCallbacks, + known_topics = NewKnownTopics}}; handle_info(Msg, State) -> lager:notice("Unexpected info message received: ~p on ~p", [Msg, State]), {noreply, State}. @@ -124,13 +137,13 @@ init([Config]) -> {ok, [Brokers, MaxMetadataRetries, ClientId, Topics, RetryInterval, RetryOnTopicError]} -> State = #state{config = Config, - topics = Topics, + known_topics = Topics, brokers = Brokers, client_id = ClientId, retry_interval = RetryInterval, retry_on_topic_error = RetryOnTopicError, max_metadata_retries = MaxMetadataRetries}, - Request = metadata_request(State), + Request = metadata_request(State, Topics), % Start requesting metadata Params = [self(), Brokers, get_metadata_tcp_options(), MaxMetadataRetries, RetryInterval, Request], @@ -160,16 +173,17 @@ handle_get_partitions(#state{broker_mapping = Mapping}) -> {ok, Mapping}. % Ignore it if the topic mapping is void, we are already requesting the metadata -handle_request_metadata(State = #state{broker_mapping = void}) -> +handle_request_metadata(State = #state{broker_mapping = void}, _Topics) -> State; -handle_request_metadata(State = #state{brokers = Brokers, - retry_interval = RetryInterval, - max_metadata_retries = MaxRetries}) -> - Request = metadata_request(State), - Params = [self(), Brokers, get_metadata_tcp_options(), MaxRetries, - RetryInterval, Request], +handle_request_metadata(State, NewTopics) -> + SortedNewTopics = lists:sort(NewTopics), + Request = metadata_request(State, SortedNewTopics), + Params = [self(), State#state.brokers, get_metadata_tcp_options(), + State#state.max_metadata_retries, State#state.retry_interval, + Request], _Pid = spawn_link(?MODULE, request_metadata, Params), - State#state{broker_mapping = void}. + NewKnownTopics = lists:umerge(State#state.known_topics, SortedNewTopics), + State#state{broker_mapping = void, known_topics = NewKnownTopics}. %%============================================================================== %% Utils @@ -235,8 +249,12 @@ request_metadata([{Host, Port} = _Broker | T] = _Brokers, TCPOpts, Request) -> %%============================================================================== %% Request building %%============================================================================== -metadata_request(#state{topics = Topics, client_id = ClientId}) -> - kafkerl_protocol:build_metadata_request(Topics, 0, ClientId). +metadata_request(#state{client_id = ClientId}, [] = _NewTopics) -> + kafkerl_protocol:build_metadata_request([], 0, ClientId); +metadata_request(#state{known_topics = KnownTopics, client_id = ClientId}, + NewTopics) -> + AllTopics = lists:umerge(KnownTopics, NewTopics), + kafkerl_protocol:build_metadata_request(AllTopics, 0, ClientId). %%============================================================================== %% Topic/broker mapping From f3e578caeaebcf3b3fa962936be2259e926c41ae Mon Sep 17 00:00:00 2001 From: Hernan Rivas Acosta Date: Thu, 14 Aug 2014 19:39:20 -0300 Subject: [PATCH 3/5] fixed a bug when killing broker connections --- src/kafkerl_broker_connection.erl | 23 ++++++++++++++++------- src/kafkerl_connector.erl | 5 +++-- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/src/kafkerl_broker_connection.erl b/src/kafkerl_broker_connection.erl index 9470627..aa23ac9 100644 --- a/src/kafkerl_broker_connection.erl +++ b/src/kafkerl_broker_connection.erl @@ -4,7 +4,7 @@ -behaviour(gen_server). %% API --export([send/2, flush/1]). +-export([send/2, flush/1, kill/1]). % Only for internal use -export([connect/5]). % Supervisors @@ -19,7 +19,8 @@ -type conn_idx() :: 0..1023. -type start_link_response() :: {ok, atom(), pid()} | ignore | {error, any()}. --record(state, {conn_idx = undefined :: conn_idx(), +-record(state, {name = undefined :: atom(), + conn_idx = undefined :: conn_idx(), client_id = undefined :: binary(), socket = undefined :: undefined | port(), address = undefined :: undefined | socket_address(), @@ -38,8 +39,8 @@ -spec start_link(conn_idx(), pid(), socket_address(), any()) -> start_link_response(). start_link(Id, Connector, Address, Config) -> - Params = [Id, Connector, Address, Config], Name = list_to_atom(atom_to_list(?MODULE) ++ "_" ++ integer_to_list(Id)), + Params = [Id, Connector, Address, Config, Name], case gen_server:start_link({local, Name}, ?MODULE, Params, []) of {ok, Pid} -> {ok, Name, Pid}; @@ -55,6 +56,10 @@ send(ServerRef, Message) -> flush(ServerRef) -> gen_server:call(ServerRef, {flush}). +-spec kill(server_ref()) -> ok. +kill(ServerRef) -> + gen_server:call(ServerRef, {kill}). + %%============================================================================== %% gen_server callbacks %%============================================================================== @@ -62,7 +67,11 @@ flush(ServerRef) -> handle_call({send, Message}, _From, State) -> handle_send(Message, State); handle_call({flush}, _From, State) -> - handle_flush(State). + handle_flush(State); +handle_call({kill}, _From, State = #state{name = Name}) -> + % TODO: handle the potentially buffered messages + lager:info("~p stopped by it's parent connector", [Name]), + {stop, normal, ok, State}. -spec handle_info(any(), state()) -> {noreply, state()}. handle_info({connected, Socket}, State) -> @@ -92,7 +101,7 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %%============================================================================== %% Handlers %%============================================================================== -init([Id, Connector, Address, Config]) -> +init([Id, Connector, Address, Config, Name]) -> Schema = [{tcp_options, [any], {default, []}}, {retry_interval, positive_integer, {default, 1000}}, {max_retries, positive_integer, {default, 3}}, @@ -100,9 +109,9 @@ init([Id, Connector, Address, Config]) -> case normalizerl:normalize_proplist(Schema, Config) of {ok, [TCPOpts, RetryInterval, MaxRetries, ClientId]} -> NewTCPOpts = kafkerl_utils:get_tcp_options(TCPOpts), - State = #state{conn_idx = Id, address = Address, connector = Connector, + State = #state{conn_idx = Id, tcp_options = NewTCPOpts, address = Address, max_retries = MaxRetries, retry_interval = RetryInterval, - tcp_options = NewTCPOpts, client_id = ClientId}, + connector = Connector, client_id = ClientId, name = Name}, Params = [self(), NewTCPOpts, Address, RetryInterval, MaxRetries], _Pid = spawn_link(?MODULE, connect, Params), {ok, State}; diff --git a/src/kafkerl_connector.erl b/src/kafkerl_connector.erl index 00a9640..914be2e 100644 --- a/src/kafkerl_connector.erl +++ b/src/kafkerl_connector.erl @@ -279,7 +279,7 @@ expand_topic({0, Topic, Partitions}) -> expand_topic({Error, Topic, _Partitions}) -> lager:error("Error ~p on metadata for topic ~p", [kafkerl_error:get_error_name(Error), Topic]), - false. + {true, {Topic, []}}. expand_partitions(Metadata) -> expand_partitions(Metadata, []). @@ -315,7 +315,8 @@ start_broker_connection(N, Address, Config) -> {ok, Name, _Pid} -> Name; {error, {already_started, Pid}} -> - Pid + kafkerl_broker_connection:kill(Pid), + start_broker_connection(N, Address, Config) end. % This is used to return the available partitions for each topic From 9f9124641635fca704ccb66eb57d5a1e3d70d9e5 Mon Sep 17 00:00:00 2001 From: Hernan Rivas Acosta Date: Wed, 20 Aug 2014 16:30:54 -0300 Subject: [PATCH 4/5] improved the handling of certain partition errors --- src/kafkerl_broker_connection.erl | 51 ++++++++++++++++---------- src/kafkerl_buffer.erl | 4 +- src/kafkerl_connector.erl | 61 +++++++++++++++++++++---------- src/kafkerl_protocol.erl | 14 +++---- 4 files changed, 82 insertions(+), 48 deletions(-) diff --git a/src/kafkerl_broker_connection.erl b/src/kafkerl_broker_connection.erl index aa23ac9..7cf6d76 100644 --- a/src/kafkerl_broker_connection.erl +++ b/src/kafkerl_broker_connection.erl @@ -173,10 +173,10 @@ handle_tcp_data(Bin, State = #state{connector = Connector}) -> {ok, Messages} -> Errors = get_errors_from_produce_response(Topics), case handle_errors(Errors, Messages) of - {ignore, _} -> + ignore -> State; {request_metadata, MessagesToResend} -> - Connector ! force_metadata_request, + kafkerl_connector:request_metadata(Connector), F = fun(M) -> kafkerl_connector:send(Connector, M) end, ok = lists:foreach(F, MessagesToResend), State @@ -206,38 +206,51 @@ build_correlation_id(State = #state{request_number = RequestNumber, get_errors_from_produce_response(Topics) -> get_errors_from_produce_response(Topics, []). +get_errors_from_produce_response([], Acc) -> + lists:reverse(Acc); get_errors_from_produce_response([{Topic, Partitions} | T], Acc) -> Errors = [{Topic, Partition, Error} || {Partition, Error, _} <- Partitions, Error =/= ?NO_ERROR], get_errors_from_produce_response(T, Acc ++ Errors). -handle_errors([] = Errors, _Messages) -> - {ignore, Errors}; +handle_errors([], _Messages) -> + ignore; handle_errors(Errors, Messages) -> - F = fun(E, Acc) -> handle_error(E, Messages, Acc) end, - lists:foreach(F, {ignore, []}, Errors). + F = fun(E) -> handle_error(E, Messages) end, + case lists:filtermap(F, Errors) of + [] -> ignore; + L -> {request_metadata, L} + end. -handle_error({Topic, Partition, Error}, Messages, {_Status, Acc} = Status) +handle_error({Topic, Partition, Error}, Messages) when Error =:= ?UNKNOWN_TOPIC_OR_PARTITION orelse Error =:= ?LEADER_NOT_AVAILABLE orelse Error =:= ?NOT_LEADER_FOR_PARTITION -> case get_message_for_error(Topic, Partition, Messages) of - undefined -> Status; - Message -> {request_metadata, [Message | Acc]} + undefined -> false; + Message -> {true, Message} end; -handle_error({Topic, Partition, Error}, _Messages, Acc) -> +handle_error({Topic, Partition, Error}, _Messages) -> lager:error("Unable to handle ~p error on topic ~p, partition ~p", [kafkerl_error:get_error_name(Error), Topic, Partition]), - Acc. + false. -get_message_for_error(Topic, Partition, []) -> - lager:error("no saved message found for error on topic ~p, partition ~p", - [Topic, Partition]), - undefined; -get_message_for_error(Topic, Partition, [{Topic, Partition, _} = H | _T]) -> - H; -get_message_for_error(Topic, Partition, [_Message | T]) -> - get_message_for_error(Topic, Partition, T). +get_message_for_error(Topic, Partition, SavedMessages) -> + case lists:keyfind(Topic, 1, SavedMessages) of + false -> + lager:error("No saved messages found for topic ~p, partition ~p", + [Topic, Partition]), + undefined; + {Topic, Partitions} -> + case lists:keyfind(Partition, 1, Partitions) of + false -> + lager:error("No saved messages found for topic ~p, partition ~p", + [Topic, Partition]), + undefined; + {Partition, Messages} -> + {Topic, Partition, Messages} + end + end. connect(Pid, _TCPOpts, {Host, Port} = _Address, _Timeout, 0) -> lager:error("Unable to connect to ~p:~p", [Host, Port]), diff --git a/src/kafkerl_buffer.erl b/src/kafkerl_buffer.erl index 7df6189..cb0dda6 100644 --- a/src/kafkerl_buffer.erl +++ b/src/kafkerl_buffer.erl @@ -132,8 +132,8 @@ handle_get_saved_request(CorrelationId, State) -> {reply, Response, State}. handle_delete_saved_request(CorrelationId, - State = #state{message_buffer = MessageBuffer}) -> - case lists:keytake(CorrelationId, 1, MessageBuffer) of + State = #state{saved_requests = Requests}) -> + case lists:keytake(CorrelationId, 1, Requests) of false -> {reply, {error, not_found}, State}; {value, {CorrelationId, _Bin, Messages}, NewMessageBuffer} -> diff --git a/src/kafkerl_connector.erl b/src/kafkerl_connector.erl index 914be2e..19913f7 100644 --- a/src/kafkerl_connector.erl +++ b/src/kafkerl_connector.erl @@ -31,7 +31,8 @@ config = [] :: {atom(), any()}, retry_on_topic_error = false :: boolean(), callbacks = [] :: [callback()], - known_topics = [] :: [binary()]}). + known_topics = [] :: [binary()], + pending = [] :: [basic_message()]}). -type state() :: #state{}. %%============================================================================== @@ -42,7 +43,7 @@ start_link(Name, Config) -> gen_server:start_link({local, Name}, ?MODULE, [Config], []). -spec send(server_ref(), basic_message()) -> ok | error(). -send(ServerRef, Message) when is_atom(ServerRef) -> +send(ServerRef, Message) -> send(ServerRef, Message, 1000). -spec send(server_ref(), basic_message(), integer() | infinity) -> ok | error(). send(ServerRef, Message, Timeout) -> @@ -78,7 +79,7 @@ request_metadata(ServerRef, Topics) -> -spec handle_call(any(), any(), state()) -> {reply, ok, state()} | {reply, {error, any()}, state()}. handle_call({send, Message}, _From, State) -> - {reply, handle_send(Message, State), State}; + handle_send(Message, State); handle_call({request_metadata}, _From, State) -> {reply, ok, handle_request_metadata(State, [])}; handle_call({request_metadata, Topics}, _From, State) -> @@ -92,7 +93,10 @@ handle_call({unsubscribe, Callback}, _From, State) -> handle_info(metadata_timeout, State) -> {stop, {error, unable_to_retrieve_metadata}, State}; -handle_info({metadata_updated, Mapping}, State) -> +handle_info({metadata_updated, []}, State) -> + % If the metadata arrived empty request it again + {noreply, handle_request_metadata(State, [])}; +handle_info({metadata_updated, Mapping}, State = #state{pending = Pending}) -> % Create the topic mapping (this also starts the broker connections) NewBrokerMapping = get_broker_mapping(Mapping, State), lager:debug("Refreshed topic mapping: ~p", [NewBrokerMapping]), @@ -107,10 +111,13 @@ handle_info({metadata_updated, Mapping}, State) -> NewTopics = lists:sort([T || {T, _P} <- PartitionData]), NewKnownTopics = lists:umerge(NewTopics, State#state.known_topics), lager:debug("Known topics: ~p", [NewKnownTopics]), - {noreply, - State#state{broker_mapping = NewBrokerMapping, - callbacks = NewCallbacks, - known_topics = NewKnownTopics}}; + NewState = State#state{broker_mapping = NewBrokerMapping, + callbacks = NewCallbacks, + known_topics = NewKnownTopics, + pending = []}, + F = fun(P) -> handle_send(P, NewState) end, + ok = lists:foreach(F, Pending), + {noreply, NewState}; handle_info(Msg, State) -> lager:notice("Unexpected info message received: ~p on ~p", [Msg, State]), {noreply, State}. @@ -156,16 +163,32 @@ init([Config]) -> {stop, bad_config} end. -handle_send(Message, #state{broker_mapping = Mapping}) -> +handle_send(Message, State = #state{broker_mapping = void, + pending = Pending}) -> + % If we are waiting for the metadata, just save the message and move on + % TODO: Using the buffer instead of a list in the gen_server will be safer + {reply, ok, State#state{pending = [Message | Pending]}}; +handle_send(Message, State = #state{broker_mapping = Mapping, + known_topics = KnownTopics, + retry_on_topic_error = RetryTopics}) -> {Topic, Partition, Payload} = Message, - case lists:keyfind({Topic, Partition}, 1, Mapping) of - false -> - lager:error("Dropping ~p sent to topic ~p, partition ~p, reason: ~p", - [Payload, Topic, Partition, no_broker]), - {error, invalid_topic_or_partition}; - {_, Broker} -> - kafkerl_broker_connection:send(Broker, Message) - end. + ok = case {lists:keyfind({Topic, Partition}, 1, Mapping), RetryTopics} of + {false, false} -> + % When retry topics is false, just fail + lager:error("Dropping ~p sent to topic ~p, partition ~p, reason: ~p", + [Payload, Topic, Partition, no_broker]), + {error, invalid_topic_or_partition}; + {false, true} -> + % Send the message to any broker, this will eventually trigger a new + % metadata request, there might be better ways of handling this, but + % you should not be constantly sending messages to new topics anyway + [{_, Broker} | _] = Mapping, + kafkerl_broker_connection:send(Broker, Message); + {{_, Broker}, _} -> + kafkerl_broker_connection:send(Broker, Message) + end, + NewKnownTopics = lists:umerge([Topic], KnownTopics), + {reply, ok, State#state{known_topics = NewKnownTopics}}. handle_get_partitions(#state{broker_mapping = void}) -> {error, not_available}; @@ -177,12 +200,12 @@ handle_request_metadata(State = #state{broker_mapping = void}, _Topics) -> State; handle_request_metadata(State, NewTopics) -> SortedNewTopics = lists:sort(NewTopics), - Request = metadata_request(State, SortedNewTopics), + NewKnownTopics = lists:umerge(State#state.known_topics, SortedNewTopics), + Request = metadata_request(State, NewKnownTopics), Params = [self(), State#state.brokers, get_metadata_tcp_options(), State#state.max_metadata_retries, State#state.retry_interval, Request], _Pid = spawn_link(?MODULE, request_metadata, Params), - NewKnownTopics = lists:umerge(State#state.known_topics, SortedNewTopics), State#state{broker_mapping = void, known_topics = NewKnownTopics}. %%============================================================================== diff --git a/src/kafkerl_protocol.erl b/src/kafkerl_protocol.erl index e4e98c6..1d9d857 100644 --- a/src/kafkerl_protocol.erl +++ b/src/kafkerl_protocol.erl @@ -106,8 +106,8 @@ build_produce_request({Topic, Partition, Messages}, Compression) -> TopicSize = byte_size(Topic), {Size, MessageSet} = build_message_set(Messages, Compression), {Size + TopicSize + 24, - [<<0:16/unsigned-integer, % RequiredAcks - -1:32/unsigned-integer, % Timeout + [<<-1:16/signed-integer, + -1:32/signed-integer, % Timeout 1:32/unsigned-integer, % TopicCount TopicSize:16/unsigned-integer>>, Topic, @@ -118,14 +118,12 @@ build_produce_request({Topic, Partition, Messages}, Compression) -> build_produce_request(Data, Compression) -> % Build the body of the request with multiple topics/partitions % (Docs at: http://goo.gl/J3C50c) - RequiredAcks = 0, - Timeout = -1, TopicCount = length(Data), {TopicsSize, Topics} = build_topics(Data, Compression), % 10 is the size of the header {TopicsSize + 10, - [<>, Topics]}. @@ -340,8 +338,8 @@ parse_produced_partitions(Count, <>, Acc) -> - Partition = {Partition, ErrorCode, Offset}, - parse_produced_partitions(Count - 1, Remainder, [Partition | Acc]). + PartitionData = {Partition, ErrorCode, Offset}, + parse_produced_partitions(Count - 1, Remainder, [PartitionData | Acc]). % Parse fetch response (http://goo.gl/eba5z3) From 3a0a1104ce82c92478e0f7cf1717634dcb789cf3 Mon Sep 17 00:00:00 2001 From: Hernan Rivas Acosta Date: Thu, 21 Aug 2014 16:25:18 -0300 Subject: [PATCH 5/5] fixed being forced to set a list of topics on the app config --- src/kafkerl_sup.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/kafkerl_sup.erl b/src/kafkerl_sup.erl index ad2de4f..240f0e7 100644 --- a/src/kafkerl_sup.erl +++ b/src/kafkerl_sup.erl @@ -34,8 +34,10 @@ get_connector_child_spec() -> _ -> kafkerl end, {ok, ConnConfig} = application:get_env(kafkerl, conn_config), - {ok, Topics} = application:get_env(kafkerl, topics), - + Topics = case application:get_env(kafkerl, topics) of + {ok, Any} -> Any; + undefined -> [] + end, Params = [Name, [{topics, Topics} | ConnConfig]], MFA = {kafkerl_connector, start_link, Params}, {Name, MFA, permanent, 2000, worker, [kafkerl_connector]}.