Skip to content

Commit

Permalink
Add support for timestamp offset_reset_policy (#137)
Browse files Browse the repository at this point in the history
  • Loading branch information
danmarcab committed Mar 6, 2024
1 parent 3f2ab90 commit b77f92e
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 4 deletions.
17 changes: 16 additions & 1 deletion lib/broadway_kafka/brod_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ defmodule BroadwayKafka.BrodClient do

defp lookup_offset(hosts, topic, partition, policy, client_config) do
case :brod.resolve_offset(hosts, topic, partition, policy, client_config) do
{:ok, -1} ->
# `:brod.resolve_offset` returns -1 when asked to resolve a timestamp newer
# than all the messages in the partition.
# -1 is not a valid offset you can use with `:brod.fetch` so we need to
# resolve the latest offset instead
lookup_offset(hosts, topic, partition, :latest, client_config)

{:ok, offset} ->
offset

Expand Down Expand Up @@ -246,11 +253,16 @@ defmodule BroadwayKafka.BrodClient do
defp validate_option(:offset_commit_on_ack, value) when not is_boolean(value),
do: validation_error(:offset_commit_on_ack, "a boolean", value)

defp validate_option(:offset_reset_policy, {:timestamp, timestamp})
when is_integer(timestamp) and timestamp > 0 do
{:ok, {:timestamp, timestamp}}
end

defp validate_option(:offset_reset_policy, value)
when value not in @offset_reset_policy_values do
validation_error(
:offset_reset_policy,
"one of #{inspect(@offset_reset_policy_values)}",
"one of #{inspect(@offset_reset_policy_values)} or `{:timestamp, timestamp}` where timestamp is a non-negative integer",
value
)
end
Expand Down Expand Up @@ -397,6 +409,9 @@ defmodule BroadwayKafka.BrodClient do

:latest ->
-1

{:timestamp, timestamp} when is_integer(timestamp) and timestamp >= 0 ->
timestamp
end
end

Expand Down
4 changes: 2 additions & 2 deletions lib/broadway_kafka/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ defmodule BroadwayKafka.Producer do
since only one commit request will be performed per batch.
* `:offset_reset_policy` - Optional. Defines the offset to be used when there's no initial
offset in Kafka or if the current offset has expired. Possible values are `:earliest` or
`:latest`. Default is `:latest`.
offset in Kafka or if the current offset has expired. Possible values are `:earliest`,
`:latest` or {:timestamp, timestamp} (in milliseconds). Default is `:latest`.
* `:begin_offset` - Optional. Defines how to get the initial offset for the consumers.
The possible values are `:assigned` or `:reset`. When set to `:assigned` the starting offset will be the
Expand Down
2 changes: 1 addition & 1 deletion test/brod_client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ defmodule BroadwayKafka.BrodClientTest do

assert BrodClient.init(opts) ==
{:error,
"expected :offset_reset_policy to be one of [:earliest, :latest], got: :an_atom"}
"expected :offset_reset_policy to be one of [:earliest, :latest] or `{:timestamp, timestamp}` where timestamp is a non-negative integer, got: :an_atom"}

opts = Keyword.put(@opts, :offset_reset_policy, :earliest)
assert {:ok, [], %{offset_reset_policy: :earliest}} = BrodClient.init(opts)
Expand Down

0 comments on commit b77f92e

Please sign in to comment.