Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error after upgrading to v0.3.2 #78

Closed
juanperi opened this issue Feb 22, 2022 · 7 comments · Fixed by #79 or #80
Closed

Error after upgrading to v0.3.2 #78

juanperi opened this issue Feb 22, 2022 · 7 comments · Fixed by #79 or #80

Comments

@juanperi
Copy link
Contributor

Hey there, I seem to be facing an issue that I think got introduced in 595d7a2

Here, :brod.fetch is expected to return {:ok, _} or {:error, :offset_out_of_range}
But the spec of :brod.fetch says the error tuple is :error, any, and I seem to be facing that, as what I get is

{
  :error,
  [
    {
      {:"host-1.com", port1},
      {
        {
          {:kpro_req, #Reference<0.3873234864.3221487618.224752>, :api_versions, 0, false, []},
          :closed
        },
        [
          {:kpro_lib, :send_and_recv_raw, 4, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_lib.erl', line: 70]},
          {:kpro_lib, :send_and_recv, 5, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_lib.erl', line: 81]},
          {:kpro_connection, :query_api_versions, 4, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_connection.erl', line: 246]},
          {:kpro_connection, :init_connection, 2, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_connection.erl', line: 233]},
          {:kpro_connection, :init, 4, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_connection.erl', line: 170]},
          {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 226]}
        ]
      }
    },
    {
      {:"host-1.com", port2},
      {
        {
          {:kpro_req, #Reference<0.3873234864.3221487618.224757>, :api_versions, 0, false, []},
          :closed
        },
        [
          {:kpro_lib, :send_and_recv_raw, 4, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_lib.erl', line: 70]},
          {:kpro_lib, :send_and_recv, 5, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_lib.erl', line: 81]},
          {:kpro_connection, :query_api_versions, 4, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_connection.erl', line: 246]},
          {:kpro_connection, :init_connection, 2, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_connection.erl', line: 233]},
          {:kpro_connection, :init, 4, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_connection.erl', line: 170]},
          {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 226]}
        ]
      }
    }
  ]
}

I'm not familiar with the inner workings of this library, but should the error tuple be relaxed to any error tuple?
My setup works again if I downgrade to v0.3.1

Thank you!!

@josevalim
Copy link
Member

Can you try replacing this line:

595d7a2#diff-b6e1efdb0ead446a507e7c9ce19ce2ef9575981439c063c0bea6cf5697711778R146

by:

_ ->

And let us know if it works?

@v0idpwn
Copy link
Contributor

v0idpwn commented Feb 22, 2022

Which kafka version are you using?

@juanperi
Copy link
Contributor Author

already tried that, using {:error, _} and with that it works.
What stopped me from creating a PR right away is that the current error tuple was overly specific, and I don't know enough of the inner workings to know if it's the right approach.
But if you are happy with it, I can send a PR with the change

@juanperi
Copy link
Contributor Author

Which kafka version are you using?

2.2.1

@juanperi
Copy link
Contributor Author

juanperi commented Mar 1, 2022

hey @josevalim I think we need to re-open this issue.
After the fix, I noticed that I'm re-consuming all the messages in the topic, instead of using the one where I left on.
The last version that works well for me is 0.3.0, because of this code: https://github.com/dashbitco/broadway_kafka/blob/v0.3.0/lib/broadway_kafka/brod_client.ex#L129

    if current_offset == :undefined do
      [...]
    else
      current_offset
    end

In there, I already have an offset, which as far as I can tell is valid, and that's why this has been working up until 0.3.1 (where it broke)

Now, in the next versions (lets use 0.3.3) having an offset in current_offset is not enough, and goes through one more validation step where it manually calls :brod.fetch https://github.com/dashbitco/broadway_kafka/blob/v0.3.3/lib/broadway_kafka/brod_client.ex#L142
But to me, that call is failing when tries to later on call kpro:connect_partition_leader https://github.com/kafka4beam/brod/blob/3.16.1/src/brod_utils.erl#L288

The error is the same one i reported on top of this thread:

{
  :error,
  [
    {
      {:"host-1.com", port1},
      {
        {
          {:kpro_req, #Reference<0.3873234864.3221487618.224752>, :api_versions, 0, false, []},
          :closed
        },
        [
          {:kpro_lib, :send_and_recv_raw, 4, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_lib.erl', line: 70]},
          {:kpro_lib, :send_and_recv, 5, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_lib.erl', line: 81]},
          {:kpro_connection, :query_api_versions, 4, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_connection.erl', line: 246]},
          {:kpro_connection, :init_connection, 2, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_connection.erl', line: 233]},
          {:kpro_connection, :init, 4, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_connection.erl', line: 170]},
          {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 226]}
        ]
      }
    },
    {
      {:"host-1.com", port2},
      {
        {
          {:kpro_req, #Reference<0.3873234864.3221487618.224757>, :api_versions, 0, false, []},
          :closed
        },
        [
          {:kpro_lib, :send_and_recv_raw, 4, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_lib.erl', line: 70]},
          {:kpro_lib, :send_and_recv, 5, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_lib.erl', line: 81]},
          {:kpro_connection, :query_api_versions, 4, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_connection.erl', line: 246]},
          {:kpro_connection, :init_connection, 2, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_connection.erl', line: 233]},
          {:kpro_connection, :init, 4, [file: '/Users/jperi/workspace/sylar/deps/kafka_protocol/src/kpro_connection.erl', line: 170]},
          {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 226]}
        ]
      }
    }
  ]
}

Just to be clear, the actual consumption of messages is not broken, I can fetch records all right. But the offset is reset to 0 after each deploy.
So, I assume that there is something wrong in the way we are now calling :brod.fetch to try to validate the offset. It is not even reaching the point where tries to retrieve the offset, but fails when doing the connection

@josevalim josevalim reopened this Mar 1, 2022
@josevalim
Copy link
Member

Thanks for the updates, a new PR is welcome!

@juanperi
Copy link
Contributor Author

juanperi commented Mar 1, 2022

Awesome. I'll keep researching

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants