Skip to content

Commit

Permalink
Merge pull request #15 from HernanRivasAcosta/master
Browse files Browse the repository at this point in the history
Parity
  • Loading branch information
HernanRivasAcosta committed Sep 16, 2014
2 parents 77dceb1 + cdac166 commit b266ce5
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 4 deletions.
9 changes: 7 additions & 2 deletions src/kafkerl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
get_partitions/0, get_partitions/1,
subscribe/1, subscribe/2, subscribe/3,
unsubscribe/1, unsubscribe/2,
request_metadata/0, request_metadata/1, request_metadata/2]).
request_metadata/0, request_metadata/1, request_metadata/2,
valid_message/1]).

-include("kafkerl.hrl").
-include("kafkerl_consumers.hrl").
Expand Down Expand Up @@ -69,4 +70,8 @@ request_metadata(Topics) ->
request_metadata(?MODULE, Topics).
-spec request_metadata(atom(), [topic()]) -> ok.
request_metadata(Name, Topics) ->
kafkerl_connector:request_metadata(Name, Topics).
kafkerl_connector:request_metadata(Name, Topics).

-spec valid_message(any()) -> boolean().
valid_message(Any) ->
kafkerl_utils:valid_message(Any).
42 changes: 40 additions & 2 deletions src/kafkerl_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

-export([send_event/2, send_error/2]).
-export([get_tcp_options/1]).
-export([merge_messages/1]).
-export([merge_messages/1, split_messages/1, valid_message/1]).
-export([buffer_name/2]).

-include("kafkerl.hrl").
Expand Down Expand Up @@ -42,6 +42,27 @@ get_tcp_options(Options) -> % TODO: refactor
merge_messages(Topics) ->
merge_topics(Topics).

% Not as costly, but still avoid this in a place where performance is critical
-spec split_messages(merged_message()) -> [basic_message()].
split_messages({Topic, {Partition, Messages}}) ->
{Topic, Partition, Messages};
split_messages({Topic, Partitions}) ->
[{Topic, Partition, Messages} || {Partition, Messages} <- Partitions];
split_messages(Topics) ->
lists:flatten([split_messages(Topic) || Topic <- Topics]).

-spec valid_message(any()) -> boolean().
valid_message({Topic, Partition, Messages}) ->
is_binary(Topic) andalso is_integer(Partition) andalso Partition >= 0 andalso
(is_binary(Messages) orelse is_list_of_binaries(Messages));
valid_message({Topic, Partition}) ->
is_binary(Topic) andalso (is_partition(Partition) orelse
is_partition_list(Partition));
valid_message(L) when is_list(L) ->
lists:all(fun valid_message/1, L);
valid_message(_Any) ->
false.

-spec buffer_name(topic(), partition()) -> atom().
buffer_name(Topic, Partition) ->
Bin = <<Topic/binary, $., (integer_to_binary(Partition))/binary, "_buffer">>,
Expand All @@ -50,6 +71,7 @@ buffer_name(Topic, Partition) ->
%%==============================================================================
%% Utils
%%==============================================================================
%% Merge
merge_topics({Topic, Partition, Message}) ->
merge_topics([{Topic, Partition, Message}]);
merge_topics([{Topic, Partition, Message}]) ->
Expand Down Expand Up @@ -90,4 +112,20 @@ merge_messages(A, B) ->
{false, true} -> B ++ [A];
{true, false} -> [B | A];
{false, false} -> [B, A]
end.
end.

is_list_of_binaries(L) when is_list(L) ->
length(L) > 0 andalso lists:all(fun is_binary/1, L);
is_list_of_binaries(_Any) ->
false.

is_partition_list(L) when is_list(L) ->
length(L) > 0 andalso lists:all(fun is_partition/1, L);
is_partition_list(_Any) ->
false.

is_partition({Partition, Messages}) ->
is_integer(Partition) andalso Partition >= 0 andalso
(is_binary(Messages) orelse is_list_of_binaries(Messages));
is_partition(_Any) ->
false.

0 comments on commit b266ce5

Please sign in to comment.