diff --git a/src/kafkerl.erl b/src/kafkerl.erl index 333b84b..80b101e 100644 --- a/src/kafkerl.erl +++ b/src/kafkerl.erl @@ -6,7 +6,8 @@ get_partitions/0, get_partitions/1, subscribe/1, subscribe/2, subscribe/3, unsubscribe/1, unsubscribe/2, - request_metadata/0, request_metadata/1, request_metadata/2]). + request_metadata/0, request_metadata/1, request_metadata/2, + valid_message/1]). -include("kafkerl.hrl"). -include("kafkerl_consumers.hrl"). @@ -69,4 +70,8 @@ request_metadata(Topics) -> request_metadata(?MODULE, Topics). -spec request_metadata(atom(), [topic()]) -> ok. request_metadata(Name, Topics) -> - kafkerl_connector:request_metadata(Name, Topics). \ No newline at end of file + kafkerl_connector:request_metadata(Name, Topics). + +-spec valid_message(any()) -> boolean(). +valid_message(Any) -> + kafkerl_utils:valid_message(Any). \ No newline at end of file diff --git a/src/kafkerl_utils.erl b/src/kafkerl_utils.erl index 7d28f53..bbcd149 100644 --- a/src/kafkerl_utils.erl +++ b/src/kafkerl_utils.erl @@ -3,7 +3,7 @@ -export([send_event/2, send_error/2]). -export([get_tcp_options/1]). --export([merge_messages/1]). +-export([merge_messages/1, split_messages/1, valid_message/1]). -export([buffer_name/2]). -include("kafkerl.hrl"). @@ -42,6 +42,27 @@ get_tcp_options(Options) -> % TODO: refactor merge_messages(Topics) -> merge_topics(Topics). +% Not as costly, but still avoid this in a place where performance is critical +-spec split_messages(merged_message()) -> [basic_message()]. +split_messages({Topic, {Partition, Messages}}) -> + {Topic, Partition, Messages}; +split_messages({Topic, Partitions}) -> + [{Topic, Partition, Messages} || {Partition, Messages} <- Partitions]; +split_messages(Topics) -> + lists:flatten([split_messages(Topic) || Topic <- Topics]). + +-spec valid_message(any()) -> boolean(). +valid_message({Topic, Partition, Messages}) -> + is_binary(Topic) andalso is_integer(Partition) andalso Partition >= 0 andalso + (is_binary(Messages) orelse is_list_of_binaries(Messages)); +valid_message({Topic, Partition}) -> + is_binary(Topic) andalso (is_partition(Partition) orelse + is_partition_list(Partition)); +valid_message(L) when is_list(L) -> + lists:all(fun valid_message/1, L); +valid_message(_Any) -> + false. + -spec buffer_name(topic(), partition()) -> atom(). buffer_name(Topic, Partition) -> Bin = <>, @@ -50,6 +71,7 @@ buffer_name(Topic, Partition) -> %%============================================================================== %% Utils %%============================================================================== +%% Merge merge_topics({Topic, Partition, Message}) -> merge_topics([{Topic, Partition, Message}]); merge_topics([{Topic, Partition, Message}]) -> @@ -90,4 +112,20 @@ merge_messages(A, B) -> {false, true} -> B ++ [A]; {true, false} -> [B | A]; {false, false} -> [B, A] - end. \ No newline at end of file + end. + +is_list_of_binaries(L) when is_list(L) -> + length(L) > 0 andalso lists:all(fun is_binary/1, L); +is_list_of_binaries(_Any) -> + false. + +is_partition_list(L) when is_list(L) -> + length(L) > 0 andalso lists:all(fun is_partition/1, L); +is_partition_list(_Any) -> + false. + +is_partition({Partition, Messages}) -> + is_integer(Partition) andalso Partition >= 0 andalso + (is_binary(Messages) orelse is_list_of_binaries(Messages)); +is_partition(_Any) -> + false.