Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to connect to Azure EventHub using Kafka protocol #376

Closed
patrickmedina opened this issue May 14, 2021 · 2 comments · Fixed by #379
Closed

Unable to connect to Azure EventHub using Kafka protocol #376

patrickmedina opened this issue May 14, 2021 · 2 comments · Fixed by #379
Assignees
Labels

Comments

@patrickmedina
Copy link

Description

RIG is crashing when connecting to Azure EventHub but connects fine when using a different Kafka client(I.e Kafkacat, node-rdkafka, etc.). Azure EventHub supports clients via Kafka protocol(see here). I think this is an issue with brod_client library. Some forum suggest to set query_api_version to false when setting up the brod_client, tried it by setting in config/config.exs but with no luck.

How to reproduce

When I run RIG with these settings:

Running the examples in 'examples/channels-example' folder, and changing with below configuration in app.docker-compose.yml file:
KAFKA_SOURCE_TOPICS=<eventhub name/topic>
KAFKA_BROKERS=mynamespace.servicebus.windows.net:9093
KAFKA_SASL=plain:$ConnectionString:Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=<KeyName>;SharedAccessKey=<AccessKey>

..and set up clients and services like this:

Same as what is in the examples/channels-example clients

..I see the following error and/or log output:

11:16:35.969 module=gen_server [error] GenServer #PID<0.3033.0> terminating
** (stop) :client_down
Last message: {:DOWN, #Reference<0.3794099790.769130498.261897>, :process, {:"brod_client_114b7a82-0a45-416e-a880-3ed8e9598afa", :"rig@127.0.0.1"}, [{{"realtime-update.servicebus.windows.net", 9093}, {{{:kpro_req, #Reference<0.3794099790.769392641.115667>, :api_versions, 0, false, []}, :closed}, [{:kpro_lib, :send_and_recv_raw, 4, [file: 'src/kpro_lib.erl', line: 70]}, {:kpro_lib, :send_and_recv, 5, [file: 'src/kpro_lib.erl', line: 81]}, {:kpro_connection, :query_api_versions, 4, [file: 'src/kpro_connection.erl', line: 246]}, {:kpro_connection, :init_connection, 2, [file: 'src/kpro_connection.erl', line: 233]}, {:kpro_connection, :init, 4, [file: 'src/kpro_connection.erl', line: 170]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 226]}]}}]}
State: {:state, :"brod_client_114b7a82-0a45-416e-a880-3ed8e9598afa", #Reference<0.3794099790.769130498.261897>, "rig-kafka-to-filter", :undefined, :undefined, #PID<0.3034.0>, [], [begin_offset: :latest], true, :undefined, RigKafka.Client.GroupSubscriber, %{callback: &Rig.EventStream.KafkaToFilter.kafka_handler/2, schema_registry_host: nil}, :message}

..but I really expected this:

Connecting to EventHub from a different client(kafkacat) is successful:
kafkacat -b mynamespace.servicebus.windows.net:9093 \
-X security.protocol=sasl_ssl \
-X sasl.mechanism=PLAIN \
-X sasl.username='$ConnectionString' \
-X sasl.password='Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=<KeyName>;SharedAccessKey=<AccessKey>' \
-L

Metadata for all topics (from broker 0: sasl_ssl://mynamespace.servicebus.windows.net:9093/0):
1 brokers:
broker 0 at mynamespace.servicebus.windows.net:9093 (controller)
2 topics:
topic "mytopic" with 1 partitions:
partition 0, leader 0, replicas: , isrs:
topic "mytopic2" with 2 partitions:
partition 0, leader 0, replicas: , isrs:
partition 1, leader 0, replicas: , isrs:

Versions:

  • Host OS: Docker for Mac(MacBook Pro (16-inch, 2019))
  • Frontend: Chrome
  • RIG version as shown on startup: Reactive Interaction Gateway 3.0.0-alpha.2 [rig@127.0.0.1, ERTS 11.1.8, OTP 23]
@patrickmedina
Copy link
Author

Tried setting query_api_versions to false in lib/rig_kafka/client.ex:152 but encountered an SSL handshake issue during connection.

diff --git a/lib/rig_kafka/client.ex b/lib/rig_kafka/client.ex
index be3af82e..1685fedf 100644
--- a/lib/rig_kafka/client.ex
+++ b/lib/rig_kafka/client.ex
@@ -152,6 +152,7 @@ defmodule RigKafka.Client do
       [
         endpoints: brokers,
         auto_start_producers: true,
+       query_api_versions: false,
         default_producer_config: []
       ]
       |> add_ssl_conf(ssl)

Here's the logs from RIG

rig-dev    | ** (stop) exited in: :gen_server.call(:"brod_client_63510f32-dfd4-472e-8f7d-0f673acdf5fe", {:get_group_coordinator, "rig-proxy-response"}, :infinity)
rig-dev    |     ** (EXIT) [{{"realtime-update.servicebus.windows.net", 9093}, {{{:kpro_req, #Reference<0.2030329627.507248641.110356>, :sasl_handshake, 0, false, [[<<0, 5>>, "PLAIN"]]}, :closed}, [{:kpro_lib, :send_and_recv_raw, 4, [file: 'src/kpro_lib.erl', line: 70]}, {:kpro_lib, :send_and_recv, 5, [file: 'src/kpro_lib.erl', line: 81]}, {:kpro_sasl, :handshake, 6, [file: 'src/kpro_sasl.erl', line: 117]}, {:kpro_sasl, :auth, 7, [file: 'src/kpro_sasl.erl', line: 47]}, {:kpro_connection, :init_connection, 2, [file: 'src/kpro_connection.erl', line: 240]}, {:kpro_connection, :init, 4, [file: 'src/kpro_connection.erl', line: 170]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 226]}]}}]
rig-dev    |     (stdlib 3.14) gen_server.erl:246: :gen_server.call/3
rig-dev    |     (brod 3.15.1) /opt/sites/rig/deps/brod/src/brod_client.erl:839: :brod_client.safe_gen_call/3
rig-dev    |     (brod 3.15.1) /opt/sites/rig/deps/brod/src/brod_group_coordinator.erl:446: anonymous fn/2 in :brod_group_coordinator.discover_coordinator/1
rig-dev    |     (brod 3.15.1) /opt/sites/rig/deps/brod/src/brod_group_coordinator.erl:446: :brod_group_coordinator.discover_coordinator/1
rig-dev    |     (brod 3.15.1) /opt/sites/rig/deps/brod/src/brod_group_coordinator.erl:540: :brod_group_coordinator.do_stabilize/3
rig-dev    |     (brod 3.15.1) /opt/sites/rig/deps/brod/src/brod_group_coordinator.erl:349: :brod_group_coordinator.handle_info/2
rig-dev    |     (stdlib 3.14) gen_server.erl:689: :gen_server.try_dispatch/4
rig-dev    |     (stdlib 3.14) gen_server.erl:765: :gen_server.handle_msg/6
rig-dev    | Last message: {:lo_cmd_stabilize, 0, :undefined}
rig-dev    | State: {:state, :"brod_client_63510f32-dfd4-472e-8f7d-0f673acdf5fe", "rig-proxy-response", "", :undefined, 0, ["rig-proxy-response"], :undefined, :undefined, [], false, #PID<0.4004.0>, :brod_group_subscriber, [], :undefined, :roundrobin_v2, 30, 5, 5, 1, :undefined, :commit_to_kafka_v2, 5, :roundrobin_v2}
rig-dev    | 02:56:34.090 module=RigKafka.Client [warn] SASL is enabled, but SSL is not - credentials are transmitted as cleartext.
rig-dev    | 02:56:34.091 module=RigKafka.Client [warn] SASL is enabled, but SSL is not - credentials are transmitted as cleartext.
rig-dev    | 02:56:34.602 module=RigKafka.Client [warn] SASL is enabled, but SSL is not - credentials are transmitted as cleartext.
rig-dev    | 02:56:35.676 [warn] :brod_client [#PID<0.4019.0>] :"brod_client_63510f32-dfd4-472e-8f7d-0f673acdf5fe" is terminating
rig-dev    | reason: [
rig-dev    |   {{"realtime-update.servicebus.windows.net", 9093},
rig-dev    |    {{{:kpro_req, #Reference<0.2030329627.506986500.232614>, :sasl_handshake, 0,
rig-dev    |       false, [[<<0, 5>>, "PLAIN"]]}, :closed},
rig-dev    |     [
rig-dev    |       {:kpro_lib, :send_and_recv_raw, 4, [file: 'src/kpro_lib.erl', line: 70]},
rig-dev    |       {:kpro_lib, :send_and_recv, 5, [file: 'src/kpro_lib.erl', line: 81]},
rig-dev    |       {:kpro_sasl, :handshake, 6, [file: 'src/kpro_sasl.erl', line: 117]},
rig-dev    |       {:kpro_sasl, :auth, 7, [file: 'src/kpro_sasl.erl', line: 47]},
rig-dev    |       {:kpro_connection, :init_connection, 2,
rig-dev    |        [file: 'src/kpro_connection.erl', line: 240]},
rig-dev    |       {:kpro_connection, :init, 4, [file: 'src/kpro_connection.erl', line: 170]},
rig-dev    |       {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 226]}
rig-dev    |     ]}}
rig-dev    | ]

@patrickmedina
Copy link
Author

@kevinbader / @mmacai,

Tested #379 and confirmed it's working. I can now connect to Azure EventHub directly from RIG.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants