diff --git a/lib/broadway_kafka/producer/buffer.ex b/lib/broadway_kafka/producer/buffer.ex index c51dcda..ca23ba0 100644 --- a/lib/broadway_kafka/producer/buffer.ex +++ b/lib/broadway_kafka/producer/buffer.ex @@ -29,11 +29,13 @@ defmodule BroadwayKafka.Producer.Buffer do @spec enqueue_with_key(t, key, list :: [Message.t()], at :: :rear | :front) :: t @doc false + def enqueue_with_key(buffer, key, list, at \\ :rear) + def enqueue_with_key(buffer, _key, [], _at) do buffer end - def enqueue_with_key(buffer, key, list, at \\ :rear) do + def enqueue_with_key(buffer, key, list, at) do if at == :rear do :queue.in({key, list}, buffer) else @@ -85,19 +87,19 @@ defmodule BroadwayKafka.Producer.Buffer do end end - def split(list, count) do + defp split(list, count) do split_list(list, count, nil, 0, []) end - def split_list(rest, 0, last_item, length_of_new_list, acc) do + defp split_list(rest, 0, last_item, length_of_new_list, acc) do {Enum.reverse(acc), last_item, length_of_new_list, rest} end - def split_list([], _count, last_item, length_of_new_list, acc) do + defp split_list([], _count, last_item, length_of_new_list, acc) do {Enum.reverse(acc), last_item, length_of_new_list, []} end - def split_list([head | tail], count, _last_item, length_of_new_list, acc) do + defp split_list([head | tail], count, _last_item, length_of_new_list, acc) do split_list(tail, count - 1, head, length_of_new_list + 1, [head | acc]) end end