Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop support of topic/partition for topics option #22

Closed
gabrielgiordan opened this issue Mar 11, 2020 · 3 comments
Closed

Drop support of topic/partition for topics option #22

gabrielgiordan opened this issue Mar 11, 2020 · 3 comments

Comments

@gabrielgiordan
Copy link
Contributor

Given the following exception trying to connect to Kafka with a keyword list of topic/partition [test: n], I think that we maybe should support only a list of topics:

09:48:54.434 [info]  [supervisor: {:local, :brod_sup}, started: [pid: #PID<0.241.0>, id: Consumer.Broadway.Producer_0.Client, mfargs: {:brod_client, :start_link, [["##############.aws.confluent.cloud": 9092], Consumer.Broadway.Producer_0.Client, [sasl: {:plain, "#################", #Function<11.113019958/0 in :brod_utils.init_sasl_opt/1>}, ssl: [cacertfile: "./priv/ssl/server.crt", keyfile: "./priv/ssl/server.key", certfile: "./priv/ssl/server.crt"]]]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker]]
Interactive Elixir (1.9.4) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> 
09:48:56.857 [info]  Group member (test,coor=#PID<0.245.0>,cb=#PID<0.240.0>,generation=0):
Leaving group, reason: {{:function_clause, [:topics, :protocol_metadata, :group_protocols, {:join_group, 0}], [group_id: "test", session_timeout: 30000, member_id: "", protocol_type: "consumer", group_protocols: [[protocol_name: :roundrobin_v2, protocol_metadata: [version: 0, topics: [test: 0], user_data: ""]]]]}, [{:kpro_lib, :encode, [:string, {:test, 0}], []}, {:kpro_req_lib, :enc_struct_field, 3, [file: 'src/kpro_req_lib.erl', line: 398]}, {:kpro_req_lib, :"-enc_struct_field/3-lc$^0/1-0-", 3, [file: 'src/kpro_req_lib.erl', line: 389]}, {:kpro_req_lib, :enc_struct_field, 3, [file: 'src/kpro_req_lib.erl', line: 389]}, {:kpro_req_lib, :enc_struct, 3, [file: 'src/kpro_req_lib.erl', line: 379]}, {:kpro_req_lib, :enc_struct, 3, [file: 'src/kpro_req_lib.erl', line: 380]}, {:kpro_req_lib, :translate, 2, [file: 'src/kpro_req_lib.erl', line: 410]}, {:kpro_req_lib, :enc_struct, 3, [file: 'src/kpro_req_lib.erl', line: 378]}]}

 
09:48:56.863 [error] GenServer #PID<0.245.0> terminating
** (stop) {:function_clause, [:topics, :protocol_metadata, :group_protocols, {:join_group, 0}], [group_id: "test", session_timeout: 30000, member_id: "", protocol_type: "consumer", group_protocols: [[protocol_name: :roundrobin_v2, protocol_metadata: [version: 0, topics: [test: 0], user_data: ""]]]]}
    (kafka_protocol) :kpro_lib.encode(:string, {:test, 0})
    (kafka_protocol) src/kpro_req_lib.erl:398: :kpro_req_lib.enc_struct_field/3
    (kafka_protocol) src/kpro_req_lib.erl:389: :kpro_req_lib."-enc_struct_field/3-lc$^0/1-0-"/3
    (kafka_protocol) src/kpro_req_lib.erl:389: :kpro_req_lib.enc_struct_field/3
    (kafka_protocol) src/kpro_req_lib.erl:379: :kpro_req_lib.enc_struct/3
    (kafka_protocol) src/kpro_req_lib.erl:380: :kpro_req_lib.enc_struct/3
    (kafka_protocol) src/kpro_req_lib.erl:410: :kpro_req_lib.translate/2
    (kafka_protocol) src/kpro_req_lib.erl:378: :kpro_req_lib.enc_struct/3
Last message: {:lo_cmd_stabilize, 0, :undefined}
State: {:state, Consumer.Broadway.Producer_0.Client, "test", "", :undefined, 0, [test: 0], :undefined, :undefined, [], false, #PID<0.240.0>, BroadwayKafka.Producer, [], :undefined, :roundrobin_v2, 30, 5, 5, 2, :undefined, :commit_to_kafka_v2, 1, :roundrobin_v2} 

-type topic() :: kpro:topic().

https://github.com/klarna/brod/blob/master/src/brod.erl

-type topic() :: binary().

https://github.com/klarna/kafka_protocol/blob/master/src/kpro.erl

@msaraiva
Copy link
Collaborator

@gabrielgiordan I'm not sure if passing a keyword list has ever worked. We certainly should only accept [binary()]. Thanks for reporting this.

@msaraiva msaraiva changed the title Support of topic/partition option Drop support of topic/partition for topics option Mar 27, 2020
@lud
Copy link

lud commented Jul 5, 2021

Hi,

Is there a way to configure broadway_kafka to listen to a single, known, partition? I can't find in the Broadway docs if Broadway can be used with distributed elixir. To me it looks like Broadway is not distributed, but in that case broadway_kafka must listen to a single partition when used in a cluster, am I right?

@v0idpwn
Copy link
Contributor

v0idpwn commented Jul 6, 2021

I may be wrong, but I think :brod_group_coordinator supports custom partition assignment through the assign_partitions/2 callback. BroadwayKafka currently doesn't let you configure that out of the box. I think that it should, as this seems like an important use-case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants