Skip to content

Commit

Permalink
Use reset_policy only when offset is undefined or out_of_range (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexandrexaviersm committed Feb 15, 2022
1 parent 1847a3e commit 595d7a2
Showing 1 changed file with 12 additions and 23 deletions.
35 changes: 12 additions & 23 deletions lib/broadway_kafka/brod_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -134,29 +134,18 @@ defmodule BroadwayKafka.BrodClient do

@impl true
def resolve_offset(topic, partition, current_offset, offset_reset_policy, config) do
valid_offset_range =
@offset_reset_policy_values
|> Map.new(fn semantic_offset ->
{semantic_offset,
lookup_offset(
config.hosts,
topic,
partition,
offset_reset_policy_value(offset_reset_policy),
config.client_config
)}
end)

case current_offset do
:undefined ->
valid_offset_range[offset_reset_policy]

value ->
if valid_offset_range[:earliest] <= value and value <= valid_offset_range[:latest] do
value
else
valid_offset_range[offset_reset_policy]
end
policy = offset_reset_policy_value(offset_reset_policy)

if current_offset == :undefined do
lookup_offset(config.hosts, topic, partition, policy, config.client_config)
else
case :brod.fetch(config.hosts, topic, partition, current_offset) do
{:ok, _} ->
current_offset

{:error, :offset_out_of_range} ->
lookup_offset(config.hosts, topic, partition, policy, config.client_config)
end
end
end

Expand Down

0 comments on commit 595d7a2

Please sign in to comment.