Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for timestamp offset_reset_policy #137

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion lib/broadway_kafka/brod_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ defmodule BroadwayKafka.BrodClient do

defp lookup_offset(hosts, topic, partition, policy, client_config) do
case :brod.resolve_offset(hosts, topic, partition, policy, client_config) do
{:ok, -1} ->
# `:brod.resolve_offset` returns -1 when asked to resolve a timestamp newer
# than all the messages in the partition.
# -1 is not a valid offset you can use with `:brod.fetch` so we need to
# resolve the latest offset instead
lookup_offset(hosts, topic, partition, :latest, client_config)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would anyone want to reset to earliest instead? Maybe it should be {:timestamp, timestamp, :earliest | :latest}?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following the docs here https://github.com/kafka4beam/brod/blob/master/src/brod.erl#L1125-L1157

The function returns the offset of the first message
with the given timestamp, or of the first message after
the given timestamp

I assumed the logical conclusion was to go for the latest, because it's the one after the timestamp you asked for

Happy to add the option if you think it's best

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Fine by me as is then. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome ❤️ , let me know if anything else needs to be done before this can be merged


{:ok, offset} ->
offset

Expand Down Expand Up @@ -246,11 +253,16 @@ defmodule BroadwayKafka.BrodClient do
defp validate_option(:offset_commit_on_ack, value) when not is_boolean(value),
do: validation_error(:offset_commit_on_ack, "a boolean", value)

defp validate_option(:offset_reset_policy, {:timestamp, timestamp})
when is_integer(timestamp) and timestamp > 0 do
{:ok, {:timestamp, timestamp}}
end

defp validate_option(:offset_reset_policy, value)
when value not in @offset_reset_policy_values do
validation_error(
:offset_reset_policy,
"one of #{inspect(@offset_reset_policy_values)}",
"one of #{inspect(@offset_reset_policy_values)} or `{:timestamp, timestamp}` where timestamp is a non-negative integer",
value
)
end
Expand Down Expand Up @@ -397,6 +409,9 @@ defmodule BroadwayKafka.BrodClient do

:latest ->
-1

{:timestamp, timestamp} when is_integer(timestamp) and timestamp >= 0 ->
timestamp
end
end

Expand Down
4 changes: 2 additions & 2 deletions lib/broadway_kafka/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ defmodule BroadwayKafka.Producer do
since only one commit request will be performed per batch.

* `:offset_reset_policy` - Optional. Defines the offset to be used when there's no initial
offset in Kafka or if the current offset has expired. Possible values are `:earliest` or
`:latest`. Default is `:latest`.
offset in Kafka or if the current offset has expired. Possible values are `:earliest`,
`:latest` or {:timestamp, timestamp} (in milliseconds). Default is `:latest`.

* `:begin_offset` - Optional. Defines how to get the initial offset for the consumers.
The possible values are `:assigned` or `:reset`. When set to `:assigned` the starting offset will be the
Expand Down
2 changes: 1 addition & 1 deletion test/brod_client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ defmodule BroadwayKafka.BrodClientTest do

assert BrodClient.init(opts) ==
{:error,
"expected :offset_reset_policy to be one of [:earliest, :latest], got: :an_atom"}
"expected :offset_reset_policy to be one of [:earliest, :latest] or `{:timestamp, timestamp}` where timestamp is a non-negative integer, got: :an_atom"}

opts = Keyword.put(@opts, :offset_reset_policy, :earliest)
assert {:ok, [], %{offset_reset_policy: :earliest}} = BrodClient.init(opts)
Expand Down
Loading