-
Notifications
You must be signed in to change notification settings - Fork 13
/
convenience.ex
69 lines (53 loc) · 2.04 KB
/
convenience.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
defmodule Kayrock.Convenience do
@moduledoc """
Convenience functions for working with Kayrock / Kafka API data
"""
alias Kayrock.ErrorCode
def topic_exists?(pid, topic) when is_pid(pid) do
{:ok, [topic]} = Kayrock.topics_metadata(pid, [topic])
topic[:error_code] != ErrorCode.unknown_topic()
end
def partition_last_offset(client_pid, topic, partition) do
%{^partition => offset} = partitions_last_offset(client_pid, topic, [partition])
offset
end
@doc """
Returns a map of partition to offset for partitions on the same leader
All partitions must be on the same leader or you will get -1 for the offset
"""
def partitions_last_offset(client_pid, topic, partitions) do
[first_partition | _] = partitions
partition_requests =
for partition <- partitions do
%{partition: partition, timestamp: -1}
end
request = %Kayrock.ListOffsets.V1.Request{
replica_id: -1,
topics: [%{topic: topic, partitions: partition_requests}]
}
{:ok, resp} =
Kayrock.client_call(client_pid, request, {:topic_partition, topic, first_partition})
[topic_resp] = resp.responses
Enum.into(topic_resp.partition_responses, %{}, fn %{partition: partition, offset: offset} ->
{partition, offset}
end)
end
def topic_last_offsets(client_pid, topic) do
partition_leaders = get_partition_leaders(client_pid, topic)
partitions_by_node =
Enum.reduce(partition_leaders, %{}, fn {partition, node}, acc ->
Map.update(acc, node, [partition], fn existing_partitions ->
[partition | existing_partitions]
end)
end)
Enum.reduce(partitions_by_node, %{}, fn {_, partitions}, acc ->
Map.merge(acc, partitions_last_offset(client_pid, topic, partitions))
end)
end
def get_partition_leaders(client_pid, topic) do
{:ok, [metadata]} = Kayrock.topics_metadata(client_pid, [topic])
Enum.into(metadata.partition_metadata, %{}, fn partition_metadata ->
{partition_metadata.partition, partition_metadata.leader}
end)
end
end