Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External service for kafka not working #13

Closed
juanli2 opened this issue Nov 18, 2016 · 76 comments
Closed

External service for kafka not working #13

juanli2 opened this issue Nov 18, 2016 · 76 comments
Milestone

Comments

@juanli2
Copy link

juanli2 commented Nov 18, 2016

Hi,
I tried to use a NodePort type to expose the kafka service out.
Attached my service yml file.

And here's the service descrition:
$ kubectl describe svc kafka -n kafka
Name: kafka
Namespace: kafka
Labels:
Selector: app=kafka
Type: NodePort
IP: 100.73.225.207
Port: 9092/TCP
NodePort: 32093/TCP
Endpoints: 10.44.0.10:9092,10.44.0.11:9092,10.44.0.12:9092
Session Affinity: None

But when I tried using port 32039 external to connect to the kafka service it seems not working.

$ ./bin/kafka-console-producer.sh --broker-list ${master-ip}:32093 --topic test2
abc
[2016-11-18 15:26:58,157] ERROR Error when sending message to topic test2 with key: null, value: 3 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test2-0 due to 1512 ms has passed since batch creation plus linger time

I'm pretty sure that the connection to the master-ip is working, and the port 32039 is listening in the cluster.

It works for the zookeeper, but I'm not sure why the kafka not working

@solsson
Copy link
Contributor

solsson commented Nov 18, 2016

The example kafka service 30service.yml doesn't use NodePort, so it will only be accessible within the cluster. Port will be 9092 but not exposed on nodes.

If you modified the service, what kubernetes environment do you use? Does it require a LoadBalancer type service? This repository has only focused on kafka access within the cluster.

Have you tried any of the pods in https://github.com/Yolean/kubernetes-kafka/tree/master/test ?

@juanli2
Copy link
Author

juanli2 commented Nov 21, 2016

The access within the pod is OK. Just I want to test expose the port out. It's in my local lab. Not on cloud. So I use NodePort Type. It works well for zookeeper NodePort. But it just failed when accessed outside.

What's your suggestion if I want to expose the 9092 port out.

@juanli2
Copy link
Author

juanli2 commented Nov 21, 2016

[2016-11-21 15:09:38,373] WARN [console-consumer-72367_JUANLI2-M-G38Q-1479712160584-889927b7-leader-finder-thread], Failed to find leader for Set(test1-0) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.KafkaException: fetching topic metadata for topics [Set(test1)] from broker [ArrayBuffer(BrokerEndPoint(1,kafka-1.broker.kafka.svc.cluster.local,9092), BrokerEndPoint(2,kafka-2.broker.kafka.svc.cluster.local,9092), BrokerEndPoint(0,kafka-0.broker.kafka.svc.cluster.local,9092))] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:65)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
... 3 more

It seems I need a LoadBlancer to those brokers

@donstaheli
Copy link

A kafka client that supports broker redirection will work in this scenario, just map the three internal broker endpoints to the one used in your connection string.

@vhsanche
Copy link

vhsanche commented Mar 22, 2017

Having the same issue. Were you able to figure out how to expose the kafka service @juanli2 using NodePort?

@solsson
Copy link
Contributor

solsson commented Mar 22, 2017

I'm not sure the kafka protocol allows such a thing. See an ongoing investigation in #21. I think consumers must be able to address each broker separately, though it can get the list of brokers from one or more "bootstrap" names. Kubernetes services are inherently round-robin.

@donstaheli
Copy link

I was able to get around this using broker redirection in no-kafka. What happens is that upon connection to a broker, it sends a list of known brokers which then replace whatever endpoint you used to connect with. These are the kubernetes cluster endpoints, however, which will not be the same as what the consumer is connecting with. The broker redirection feature lets you map the external endpoint with the internal one.

@vhsanche
Copy link

Ahh right on, let me try that, and will report back!

@solsson
Copy link
Contributor

solsson commented Mar 23, 2017

@donstaheli Have you tried that setup with a topic that has fewer replicas than you have brokers?

@donstaheli
Copy link

No, I haven't. What is the use case demanding that can't be met with partitioning?

@solsson
Copy link
Contributor

solsson commented Mar 25, 2017

It's more of a gotcha than a use case. In #21 we suspect that when consumers can't resolve individual brokers (internal or external services alike) they will not know how to locate a topic unless it lives on the broker they happen to hit.

I don't think a client lib can solve that, because k8s services are round-robin. There's a discussion about that in kubernetes/kubernetes#36415.

@vhsanche
Copy link

Sorry for the late update, but I looked at the broker redirection no-kafka and this is not what I'm looking for. So, my question is similar to @juanil2. Let me explain. I used the repo to create the zookeeper and Kafka statefulsets. I was successful creating everything as advertised. I am able to access Kafka by either hitting the broker-0.broker.... and so forth or by using the Kafka.default.svc.... I then went ahead and modified the 30service.yml with type: NodePort and attempt to hit this service externally to the k8 cluster, and I get the same error @juanil2 reported. If I understand nodeport services correctly, this should work? Btw, my k8 cluster is in not in a cloud environment. Any ideas ?

@solsson
Copy link
Contributor

solsson commented Apr 12, 2017

I think the no-kafka docs on remapping confirm my position that Kafka isn't meant to be used through a round-robin load balancer. It might work within a cluster though, where you can - depending on config - resolve the addresses to individual brokers that you get from your initial connection through the service.

@solsson
Copy link
Contributor

solsson commented Apr 12, 2017

You could try one service + NodePort per kafka broker, together with remapping.

@MilanDasek
Copy link

Guys,

have you figured out how to connect to kafka using AWS classic ELB?

I receive
kafkacat -b %DNS-NAME%:9092 -L
% ERROR: Failed to acquire metadata: Local: Timed out

I have TCP mapping on nginx ingress of port 9092 to service kafka/kafka:9092

data:
"2181": kafka/zookeeper:2181
"9092": kafka/kafka:9092
kind: ConfigMap

Also zookeeper is not accessible from ELB.

Internally it works fine:
./bin/kafka-topics.sh --zookeeper zookeeper:2181 --topic test2 --create --partitions 1 --replication-factor 3
./bin/kafka-console-producer.sh --broker-list kafka-0.broker:9092,kafka-1.broker:9092,kafka-2.broker:9092 --topic test2
./bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic test2 --from-beginning

thanks for any help

@BenjaminDavison
Copy link

Has anyone ever got this working with an external service?

@solsson
Copy link
Contributor

solsson commented Jul 20, 2017

I don't think so. With a single broker it might be straightforward. With >1 you'd probably have to expose each broker externally and set advertised.listeners individually.

@BenjaminDavison
Copy link

So I managed to get messages coming in, using a setup like this:

apiVersion: v1
kind: Service
metadata:
  name: kafka-service
  namespace: kafka
spec:
  ports:
  - port: 9092
    name: kafka-port
    targetPort: 9092
    protocol: TCP
  selector:
    app: kafka
  type: LoadBalancer

And changing 50kafka.yml (note I am using your 0.11 branch of the project)

          ./bin/kafka-server-start.sh
          config/server.properties
          --override log.retention.hours=-1
          --override log.dirs=/var/lib/kafka/data/topics
          --override broker.id=${HOSTNAME##*-}
          --override advertised.listeners=PLAINTEXT://foo-bar.us-west-2.elb.amazonaws.com:9092
          --override default.replication.factor=3
          --override num.partitions=3

I can connect externally and produce messages, but for some reason the Kafka cluster under replicates topics and I get a lot of this in the Kafka logs:

(kafka.server.ReplicaFetcherThread)
[2017-07-20 11:47:56,940] INFO [ReplicaFetcherThread-0-0]: Retrying leaderEpoch request for partition san1-2 as the leader reported an error: UNKNOWN_TOPIC_OR_PARTITION 

Do you have any ideas why this might be happening?

Ben

@solsson
Copy link
Contributor

solsson commented Jul 20, 2017

I guess brokers won't be able to resolve each other at the .elb.amazonaws.com name. Maybe you could add the local name as well to advertised listerners? Using ${HOSTNAME##*-} to make it specific per broker pod. I think producers will be ok even if they only find one of the listeners.

@BenjaminDavison
Copy link

BOSH!

That was it, but instead of using advertising listeners I used listeners (listeners is for internal brokers) so it now look like this:

          ./bin/kafka-server-start.sh
          config/server.properties
          --override log.retention.hours=-1
          --override auto.leader.rebalance.enable=true
          --override log.dirs=/var/lib/kafka/data/topics
          --override broker.id=${HOSTNAME##*-}
          --override advertised.listeners=PLAINTEXT://foo-bar.us-west-2.elb.amazonaws.com:9092
          --override listeners=PLAINTEXT://${HOSTNAME##*-}:9092
          --override default.replication.factor=3
          --override num.partitions=3

Output from sangrenel before (note the error rate):

2017/07/20 11:59:35 [ topic: logs ]
> Messages: 31Mb/sec @ 13501 msgs/sec. | error rate 47.41%
> Batches: 27.00 batches/sec. | 2.15903s p99 | 547.176ms HMean | 332.936ms Min | 2.167833s Max
332.936ms - 516.425ms --------------------------------------------------
516.425ms - 699.915ms -
699.915ms - 883.405ms -
883.405ms - 1.066895s -
1.066895s - 1.250384s -
1.250384s - 1.433874s --------
1.433874s - 1.617364s -
1.617364s - 1.800854s -------------------------------
1.800854s - 1.984343s -
1.984343s - 2.167833s -

After:

> Messages: 75Mb/sec @ 32701 msgs/sec. | error rate 0.00%
> Batches: 65.40 batches/sec. | 536.672ms p99 | 376.556ms HMean | 335.445ms Min | 552.701ms Max
2017/07/20 13:48:59 [ topic: san2 ]
335.445ms - 357.171ms ----------
357.171ms - 378.897ms --------------------------------------------------
378.897ms - 400.622ms ----------------
400.622ms - 422.348ms -
422.348ms - 444.073ms -
444.073ms - 465.799ms -
465.799ms - 487.524ms -
487.524ms - 509.25ms -
509.25ms - 530.975ms --
530.975ms - 552.701ms --

So the only thing left is to make the advertised.listeners get it's value from the service (which I can work out)

@solsson
Copy link
Contributor

solsson commented Jul 20, 2017

Nice!

Why is it --override listeners=PLAINTEXT://${HOSTNAME##*-}:9092? I would have guessed instead --override listeners=PLAINTEXT://kafka-${HOSTNAME##*-}:9092.

@BenjaminDavison
Copy link

BenjaminDavison commented Jul 20, 2017 via email

@solsson
Copy link
Contributor

solsson commented Jul 21, 2017

two of the Kafka brokers kept dying....

Note that in the kafka-011 branch I'm experimenting with resource limits, and they frequently cause OOMKilled. I paused that work to do solsson/dockerfiles#5 before I try more tuning.

@BenjaminDavison
Copy link

BenjaminDavison commented Jul 25, 2017

Got it working 😄

Changed the command: to:

          --override log.retention.hours=-1
          --override log.dirs=/var/lib/kafka/data/topics
          --override broker.id=${HOSTNAME##*-}
          --override listener.security.protocol.map=INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT
          --override advertised.listeners="INTERNAL_PLAINTEXT://${HOSTNAME}.broker.kafka.svc.cluster.local:9092,EXTERNAL_PLAINTEXT://$(eval wget -t3 -T2 -qO-  http://169.254.169.254/latest/meta-data/public-hostname):9093"
          --override listeners=INTERNAL_PLAINTEXT://0.0.0.0:9092,EXTERNAL_PLAINTEXT://0.0.0.0:9093
          --override inter.broker.listener.name=INTERNAL_PLAINTEXT
          --override auto.create.topics.enable=true # Just our internal config
          --override auto.leader.rebalance.enable=true # Just our internal config
          --override num.partitions=3 # Just our internal config
          --override default.replication.factor=3 # Just our internal config

You then need to change the ports: to this:

        ports:
        - containerPort: 9092
        - containerPort: 9093
          hostPort: 9093

Finally you need to open up security groups if needed. You can then connect using ec2-blah.us-blah.com:9093 without dropping any messages:

> Messages: 50Mb/sec @ 21901 msgs/sec. | error rate 0.00%
> Batches: 43.80 batches/sec. | 979.979ms p99 | 545.219ms HMean | 341.991ms Min | 1.080252s Max
341.991ms - 415.817ms --
415.817ms - 489.643ms --------------------------------------------------
489.643ms - 563.469ms -------------------------------------
563.469ms - 637.295ms ------------------------------
637.295ms - 711.121ms ----------
711.121ms - 784.947ms ----------
784.947ms - 858.774ms ---
 858.774ms - 932.6ms -
 932.6ms - 1.006426s -
1.006426s - 1.080252s -

So this is just for AWS, but for anything else just change this line: EXTERNAL_PLAINTEXT://$(eval wget -t3 -T2 -qO- http://169.254.169.254/latest/meta-data/public-hostname):9093 to get a routable hostname.

Thanks,

Ben

@solsson
Copy link
Contributor

solsson commented Jul 26, 2017

I'd like to make an "addon" PR with this, but let me get the 0.11 branch ready for merge first.

What kind of access control do you plan on using? I'm curious if it can affect the choice of listeners.

@BenjaminDavison
Copy link

For us at the moment, PLAINTEXT, but we will be moving towards a more secure method soon(ish)

@yevgeniyo
Copy link

yevgeniyo commented Jul 31, 2017

Hi @solsson, as I understand you have added code of @BenjaminDavison and now not need add all --override options which he did, just need to modify 10broker-config.yml.
Is it correct?
Can you explain me please how I can connect to kafka cluster using AWS loadbalancer?
Appreciate

Does my configuration correct:

listeners=INTERNAL_PLAINTEXT://0.0.0.0:9092,EXTERNAL_PLAINTEXT://0.0.0.0:9093

advertised.listeners="INTERNAL_PLAINTEXT://${HOSTNAME}.broker.kafka.svc.cluster.local:9092,EXTERNAL_PLAINTEXT://$(eval wget -t3 -T2 -qO- http://169.254.169.254/latest/meta-data/public-hostname):9093"

listener.security.protocol.map=INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT

Because I get exception:

2017-07-31T13:42:34.268980421Z java.lang.IllegalArgumentException: Error creating broker listeners from '"INTERNAL_PLAINTEXT://${HOSTNAME}.broker.kafka.svc.cluster.local:9092,EXTERNAL_PLAINTEXT://$(eval wget -t3 -T2 -qO- http://169.254.169.254/latest/meta-data/public-hostname):9093"': Unable to parse "INTERNAL_PLAINTEXT://${HOSTNAME}.broker.kafka.svc.cluster.local:9092 to a broker endpoint

@comdw
Copy link

comdw commented Oct 16, 2017

Bah, looks like Kafka schema registry doesn't support the listener security protocol map

No endpoints found for security protocol [PLAINTEXT]. Endpoints found in ZK [{EXTERNAL_PLAINTEXT=kafkaserver-0:32090, INTERNAL_PLAINTEXT=kafka-0.broker.default.svc.cluster.local:9092}]

@solsson
Copy link
Contributor

solsson commented Oct 16, 2017

Maybe it doesn't need to? It should always be on the inside, I guess. Can we simply rearrange the list?

Edit: ah, just saw that that error can't be from the PR. The PR keeps PLAINTEXT for inside, hence schema registry might be unaffected.

@comdw
Copy link

comdw commented Oct 16, 2017

Sorry yes, that was slightly off topic as looks like just a schema registry limitation at the moment. I've worked around it for now by configuring my listeners as "PLAINTEXT" and "EXTERNAL_PLAINTEXT".

@comdw
Copy link

comdw commented Oct 18, 2017

... however the separate external/named listeners approach will not work for all Kafka clients that are external to the k8s cluster. E.g. Kafka Tool - its able to obtain the Kafka cluster info via ZK, but errors trying to load topics and consumers, complaining that it can't reach the broker. I suspect this is because its trying to connect specifically to the listener named "PLAINTEXT" as discovered from ZK (rather like schema registry does, but that's ok if its running in the same k8s cluster), which can't be accessed from outside. Kafka tool worked with a single node, one advertised listener address:port that was reachable both inside and outside.

My own producer/consumer clients running outside the cluster do work with a broker/bootstrap config that initially talks to any broker and somehow the response is clever enough to only advertise the external addresses at that point. I'm still not confident I fully understand whats going on in these different cases though!

I could perhaps rework configuration to go back to using the same FQDN and port internally and externally when I have time to experiment.

@solsson
Copy link
Contributor

solsson commented Oct 18, 2017

as discovered from ZK

I think that type of client is deprecated. Maybe the introduction of multiple listeners was a reason for such a deprecation. I guess the host used as bootstrap must be matched to one of the listener types internally, in order to return broker hosts of the same kind. Also I don't see a reason why Zookeeper should have an external service.

@nadirabid
Copy link

nadirabid commented Oct 29, 2017

The only reason I saw the need to expose zookeeper was because we're using Sarama for the broker clients and there's no way to create topics from that client. So we were going to use the kafka tools (which last I checked did need access to Zookeeper).

We've gone with a different approach entirely (a REST based service) which allows us more control over that process. Exposing Zookeeper outside of Kubernetes seems to be fraught with security issues that I'd rather not entertain.

@MacTynow
Copy link

MacTynow commented Nov 1, 2017

If you're working on this and don't understand why your hostPort is not appearing in netstat, and you're provisioning your cluster using kops in an internal network, you might be running into kubernetes/kops#3132. This is if you're using Calico, Flannel or any of those.

Edit: Calico actually now enables portmap by default, so it works.

@solsson solsson added this to the v3.0 milestone Nov 7, 2017
@solsson
Copy link
Contributor

solsson commented Nov 7, 2017

Fixed with the merge of #78.

Alongs these I've been thinking that it would be quite easy to set up kafka access over one kubectl port-forward per broker.

@styk-tv
Copy link

styk-tv commented May 22, 2018

I solved it in the following way using port-forward and without having to expose kafka to outside world (tiny modification of the solsson/kafka setup):

My init.sh ConfigMap

      OUTSIDE_HOST=localhost
      if [ $? -ne 0 ]; then
        echo "Outside (i.e. cluster-external access) host lookup command failed"
      else
        OUTSIDE_PORT=3240${KAFKA_BROKER_ID}
        sed -i "s|#init#advertised.listeners=OUTSIDE://#init#|advertised.listeners=OUTSIDE://${OUTSIDE_HOST}:${OUTSIDE_PORT}|" /etc/kafka/server.properties
        ANNOTATIONS="$ANNOTATIONS kafka-listener-outside-host=$OUTSIDE_HOST kafka-listener-outside-port=$OUTSIDE_PORT"
      fi

Broker exposes 9094 and then you get OUTSIDE addresses of your nodes with ports 3240 + "ID OF BROKER" so in case of 3 node setup this would be 32400 32401 32402.

Then i create 4 port forwards

kubectl -n kafka port-forward pod/kafka-0 9094
kubectl -n kafka port-forward pod/kafka-0 32400:9092
kubectl -n kafka port-forward pod/kafka-1 32401:9092
kubectl -n kafka port-forward pod/kafka-2 32402:9092

This way when client asks for list of nodes from a broker at localhost:9094 it will say: Hey I have 3 nodes their addresses are as follows:

localhost:32400
localhost:32401
localhost:32402

Which is correct because porf-forward maps them correctly from your local machines. And you're ready to debug.

Producer is set to localhost:9094. Consumers are set to high ports with corresponding client id equal to last integer of the high number so client-0 to 32400, client-1 to 32401, client-2 to 32402 just so i can make sense of which one is which. I used 3 consumers to test the group consumption and it works fine. To be honest I just want to debug my code and then run everything inside as standard 9092 but it took me way too much time to solve this (thanks to this thread as well). Inside/Outside setup is very cool but I think many people are confused or don't know that broker setting is just to get a list of nodes and that list is produced from how you launch kafka nodes themselves. Inside/Outside mapped on different ports 9092 (regular) 9094 (outside) is a good way to have dual nodelist available for setup. It would be beneficial to document [outside] ability so people can write/debug code against kafka from their local workstations.

@solsson
Copy link
Contributor

solsson commented May 22, 2018

@styk-tv Thanks for sharing this. I've also struggled with Kafka's listener model. I've set up mirror-maker using docker-compose with the 9092 ports by mapping brokers' kubernetes names to local containers. See https://gist.github.com/solsson/22e773ab148596ec39d54b7cc153ee67. Like your solution it is based on kubectl port-forward, which is complicated in containers until kubernetes/kubernetes#46517 gets merged.

With #172 it could be easier. Using Updating Broker Configs #78 can probably be implemented (and customized) in a way that is persistent in ZooKeeper and takes precedence over server.properties.

@hamshif
Copy link

hamshif commented Sep 4, 2018

is there a branch with external broker endpoints?

@solsson
Copy link
Contributor

solsson commented Sep 5, 2018

@hamshif #195 and the issue it references sums up the situation quite well I think

@hamshif
Copy link

hamshif commented Sep 5, 2018

@solsson thanks for the answer
I followed the instructions in The outside-services Readme on local docker kubernetes and couldn't implement the dynamic externalIP part because local docker node has no externalIP attribute!

@hamshif
Copy link

hamshif commented Sep 5, 2018

On the same deployment I have another service configured similarly which I can access from browser and curl. so I suspect that it might be a Kafka related issue and not kubernetes

@hamshif
Copy link

hamshif commented Sep 5, 2018

curl -s http://localhost:30120/sso/secured/healthcheck/status
UP%

➜ ~ kubectl describe svc sso Name: sso Namespace: default Labels: app=sso Annotations: <none> Selector: app=sso,app_group=rtp,tier=backend Type: NodePort IP: 10.104.249.237 LoadBalancer Ingress: localhost Port: sso-debug 8000/TCP TargetPort: 8000/TCP NodePort: sso-debug 30121/TCP Endpoints: 10.1.3.104:8000 Port: sso 8080/TCP TargetPort: 8080/TCP NodePort: sso 30120/TCP Endpoints: 10.1.3.104:8080 Session Affinity: None External Traffic Policy: Cluster Events: <none> ➜ ~ kubectl describe svc outside-0 --namespace kafka Name: outside-0 Namespace: kafka Labels: <none> Annotations: kubectl.kubernetes.io/last-applied-configuration={"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"name":"outside-0","namespace":"kafka"},"spec":{"ports":[{"nodePort":32400,"port":3240... Selector: app=kafka,kafka-broker-id=0 Type: NodePort IP: 10.100.78.46 LoadBalancer Ingress: localhost Port: <unset> 32400/TCP TargetPort: 9094/TCP NodePort: <unset> 32400/TCP Endpoints: 10.1.3.100:9094 Session Affinity: None External Traffic Policy: Cluster Events: <none> ➜ ~

The services are the same pattern

@hamshif
Copy link

hamshif commented Sep 5, 2018

You can see a full description here:

@hamshif
Copy link

hamshif commented Sep 5, 2018

This port acts differently to curl
`➜ ~ curl http://localhost:32400

curl: (52) Empty reply from server
➜ ~ curl http://localhost:32406

curl: (7) Failed to connect to localhost port 32406: Connection refused
➜ ~`

@hamshif
Copy link

hamshif commented Sep 5, 2018

but a producer can't connect

@hamshif
Copy link

hamshif commented Sep 5, 2018

I deployed to GKE too on different nodes to no avail

@ridicule777
Copy link

Hi,

I have a problem exposing my kafka broker to external public using the classic load balancer.

Here is my startup command

kafka-server-start.sh /etc/kafka/server.properties \
--override broker.id=${HOSTNAME##*-} \
--override listener.security.protocol.map=INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT \
--override advertised.listeners="EXTERNAL_PLAINTEXT://$(eval wget -t3 -T2 -qO-  http://169.254.169.254/latest/meta-data/public-hostname):9092,INTERNAL_PLAINTEXT://${HOSTNAME}.broker.kafka.svc.cluster.local:29092" \
--override advertised.host.name="$(eval wget -t3 -T2 -qO-  http://169.254.169.254/latest/meta-data/public-hostname)" \
--override advertised.port=9092 \
--override listeners=INTERNAL_PLAINTEXT://0.0.0.0:29092,EXTERNAL_PLAINTEXT://0.0.0.0:9092 \
--override inter.broker.listener.name=INTERNAL_PLAINTEXT \
--override auto.create.topics.enable=true \
--override auto.leader.rebalance.enable=true \
--override zookeeper.connect=pzoo:2181 \
--override background.threads=10 \
--override compression.type=producer \
--override delete.topic.enable=false \
--override leader.imbalance.check.interval.seconds=300 \
--override leader.imbalance.per.broker.percentage=10 \
--override message.max.bytes=1000012 \
--override min.insync.replicas=1 \
--override num.io.threads=8 \
--override num.network.threads=3 \
--override num.recovery.threads.per.data.dir=1 \
--override num.replica.fetchers=1 \
--override offset.metadata.max.bytes=4096 \
--override offsets.commit.required.acks=-1 \
--override offsets.commit.timeout.ms=5000 \
--override offsets.load.buffer.size=5242880 \
--override offsets.retention.check.interval.ms=600000 \
--override offsets.retention.minutes=10080 \
--override offsets.topic.compression.codec=0 \
--override offsets.topic.num.partitions=50 \
--override offsets.topic.replication.factor=1 \
--override offsets.topic.segment.bytes=104857600 \
--override queued.max.requests=500 \
--override quota.consumer.default=9223372036854775807 \
--override quota.producer.default=9223372036854775807 \
--override replica.fetch.min.bytes=1 \
--override replica.fetch.wait.max.ms=500 \
--override replica.high.watermark.checkpoint.interval.ms=5000 \
--override replica.lag.time.max.ms=10000 \
--override replica.socket.receive.buffer.bytes=65536 \
--override replica.socket.timeout.ms=80000 \
--override request.timeout.ms=80000 \
--override socket.receive.buffer.bytes=102400 \
--override socket.request.max.bytes=104857600 \
--override socket.send.buffer.bytes=102400 \
--override unclean.leader.election.enable=false \
--override zookeeper.session.timeout.ms=6000 \
--override zookeeper.set.acl=false \
--override broker.id.generation.enable=true \
--override connections.max.idle.ms=600000 \
--override controlled.shutdown.enable=true \
--override controlled.shutdown.max.retries=3 \
--override controlled.shutdown.retry.backoff.ms=5000 \
--override controller.socket.timeout.ms=30000 \
--override default.replication.factor=1 \
--override fetch.purgatory.purge.interval.requests=1000 \
--override group.max.session.timeout.ms=300000 \
--override group.min.session.timeout.ms=6000 \
--override inter.broker.protocol.version=2.0-IV1 \
--override log.retention.hours=-1 \
--override log.dirs=/var/lib/kafka/data/topics \
--override log.cleaner.backoff.ms=15000 \
--override log.cleaner.dedupe.buffer.size=134217728 \
--override log.cleaner.delete.retention.ms=86400000 \
--override log.cleaner.enable=true \
--override log.cleaner.io.buffer.load.factor=0.9 \
--override log.cleaner.io.buffer.size=524288 \
--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
--override log.cleaner.min.cleanable.ratio=0.5 \
--override log.cleaner.min.compaction.lag.ms=0 \
--override log.cleaner.threads=1 \
--override log.cleanup.policy=delete \
--override log.index.interval.bytes=4096 \
--override log.index.size.max.bytes=10485760 \
--override log.message.timestamp.difference.max.ms=9223372036854775807 \
--override log.message.timestamp.type=CreateTime \
--override log.preallocate=false \
--override log.retention.check.interval.ms=300000 \
--override max.connections.per.ip=2147483647 \
--override num.partitions=1 \
--override producer.purgatory.purge.interval.requests=1000 \
--override replica.fetch.backoff.ms=1000 \
--override replica.fetch.max.bytes=1048576 \
--override replica.fetch.response.max.bytes=10485760 \
--override reserved.broker.max.id=1000 \
--override log.flush.interval.messages=9223372036854775807 \
--override log.flush.offset.checkpoint.interval.ms=60000 \
--override log.flush.scheduler.interval.ms=9223372036854775807 \
--override log.retention.bytes=-1 \
--override log.retention.hours=168 \
--override log.roll.hours=168 \
--override log.roll.jitter.hours=0 \
--override log.segment.bytes=1073741824 \
--override log.segment.delete.delay.ms=60000 \
--override zookeeper.connection.timeout.ms=6000

My Statefulset manifest :

---
apiVersion: v1
kind: Service
metadata:
  name: broker
  namespace: kafka
spec:
  ports:
  - port: 29092
    name: internal
  # [podname].broker.kafka.svc.cluster.local
  clusterIP: None
  selector:
    app: kafka
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: kafka-persistent-storage
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi
  storageClassName: fasta
---
kind: Service
apiVersion: v1
metadata:
  name: kafka-svc
  annotations:
    dns.alpha.kubernetes.io/external: kafka.preprod.example.com
    service.beta.kubernetes.io/aws-load-balancer-additional-resource-tags: "Name=kafka, Environment=preprod"
spec:
  externalTrafficPolicy: Local
  ports:
  - port: 9092
  selector:
    app: kafka
  type: LoadBalancer
  sessionAffinity: None
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  namespace: kafka
spec:
  selector:
    matchLabels:
      app: kafka
  serviceName: "broker"
  replicas: 1
  updateStrategy:
    type: OnDelete
  template:
    metadata:
      labels:
        app: kafka
      annotations:
    spec:
      terminationGracePeriodSeconds: 30
      initContainers:
      - name: init-config
        image: hub.example.com/kafka:v1
        env:
        - name: NODE_NAME
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: POD_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        command: ['/bin/bash', '/etc/kafka-configmap/init.sh']
        volumeMounts:
        - name: configmap
          mountPath: /etc/kafka-configmap
        - name: config
          mountPath: /etc/kafka
      containers:
      - name: broker
        image: hub.example.com/kafka-preprod:v8
        env:
        - name: KAFKA_LOG4J_OPTS
          value: -Dlog4j.configuration=file:/etc/kafka/log4j.properties
        - name: JMX_PORT
          value: "5555"
        ports:
        - name: inside
          containerPort: 29092
        - name: outside
          containerPort: 9092
        - name: jmx
          containerPort: 5555
        command: ['/bin/bash', '/opt/kafka/bin/kafka-endpoint']
        resources:
          requests:
            cpu: 100m
            memory: 512Mi
        readinessProbe:
          tcpSocket:
            port: 9092
          timeoutSeconds: 1
        volumeMounts:
        - name: config
          mountPath: /etc/kafka
        - name: kafka-persistent-storage
          mountPath: /var/lib/kafka/data
      imagePullSecrets:
        - name: nexus
      volumes:
      - name: configmap
        configMap:
          name: broker-config
      - name: config
        emptyDir: {}
  volumeClaimTemplates:
  - metadata:
      name: kafka-persistent-storage
    spec:
      accessModes: [ "ReadWriteOnce" ]
      storageClassName: fasta
      resources:
        requests:
          storage: 10Gi

command: ['/bin/bash', '/opt/kafka/bin/kafka-endpoint'] is a bash script which has the kafka startup command with all the above mentioned override options.

Now when I try to consme messages from the external DNS kafka.preprod.example.com:9092 , I get the error,

kafkacat -C -b kafka.preprod.example.com:9092 -t testing -v        Thu Nov 22 13:22:32 2018
% ERROR: Local: Broker transport failure: ec2-11-222-33-444.us-east-2.compute.amazonaws.com:9092/0: Connect to ipv4#11.222.33.444:9092 failed: Operation timed out (after 75477ms in state CONNECT)

and when I try to publish a message, I get an error:

kafkacat -P -b kafka.preprod.example.com:9092 -t testing                                  Thu Nov 22 13:19:15 2018
%4|1542873132.444|REQTMOUT|rdkafka#producer-1| [thrd:kafka.preprod.example.com:9092/bootstrap]: kafka.preprod.example.com:9092/bootstrap: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%4|1542873138.451|REQTMOUT|rdkafka#producer-1| [thrd:kafka.preprod.example.com:9092/bootstrap]: kafka.preprod.example.com:9092/bootstrap: Timed out 0 in-flight, 60 retry-queued, 0 out-queue, 0 partially-sent requests
%4|1542873144.470|REQTMOUT|rdkafka#producer-1| [thrd:kafka.preprod.example.com:9092/bootstrap]: kafka.preprod.example.com:9092/bootstrap: Timed out 0 in-flight, 60 retry-queued, 0 out-queue, 0 partially-sent requests

Also, I do not see any errors in my kafka logs.

(kafka.server.KafkaConfig)
[2018-11-22 07:48:26,100] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2018-11-22 07:48:26,100] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2018-11-22 07:48:26,105] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2018-11-22 07:48:26,136] INFO Loading logs. (kafka.log.LogManager)
[2018-11-22 07:48:26,199] INFO [Log partition=testing-2, dir=/var/lib/kafka/data/topics] Recovering unflushed segment 0 (kafka.log.Log)
[2018-11-22 07:48:26,204] INFO [Log partition=testing-2, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,233] INFO [Log partition=testing-2, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,235] INFO [Log partition=testing-2, dir=/var/lib/kafka/data/topics] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 63 ms (kafka.log.Log)
[2018-11-22 07:48:26,255] INFO [Log partition=testing-0, dir=/var/lib/kafka/data/topics] Recovering unflushed segment 0 (kafka.log.Log)
[2018-11-22 07:48:26,255] INFO [Log partition=testing-0, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,256] INFO [Log partition=testing-0, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,257] INFO [Log partition=testing-0, dir=/var/lib/kafka/data/topics] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 3 ms (kafka.log.Log)
[2018-11-22 07:48:26,262] INFO [Log partition=testing-4, dir=/var/lib/kafka/data/topics] Recovering unflushed segment 0 (kafka.log.Log)
[2018-11-22 07:48:26,262] INFO [Log partition=testing-4, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,264] INFO [Log partition=testing-4, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,264] INFO [Log partition=testing-4, dir=/var/lib/kafka/data/topics] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 3 ms (kafka.log.Log)
[2018-11-22 07:48:26,271] INFO [Log partition=testing-1, dir=/var/lib/kafka/data/topics] Recovering unflushed segment 0 (kafka.log.Log)
[2018-11-22 07:48:26,271] INFO [Log partition=testing-1, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,272] INFO [Log partition=testing-1, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,273] INFO [Log partition=testing-1, dir=/var/lib/kafka/data/topics] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 4 ms (kafka.log.Log)
[2018-11-22 07:48:26,278] INFO [Log partition=testing-5, dir=/var/lib/kafka/data/topics] Recovering unflushed segment 0 (kafka.log.Log)
[2018-11-22 07:48:26,279] INFO [Log partition=testing-5, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,280] INFO [Log partition=testing-5, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,280] INFO [Log partition=testing-5, dir=/var/lib/kafka/data/topics] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 3 ms (kafka.log.Log)
[2018-11-22 07:48:26,288] INFO [Log partition=testing-3, dir=/var/lib/kafka/data/topics] Recovering unflushed segment 0 (kafka.log.Log)
[2018-11-22 07:48:26,288] INFO [Log partition=testing-3, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,290] INFO [Log partition=testing-3, dir=/var/lib/kafka/data/topics] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2018-11-22 07:48:26,290] INFO [Log partition=testing-3, dir=/var/lib/kafka/data/topics] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 3 ms (kafka.log.Log)
[2018-11-22 07:48:26,295] INFO Logs loading complete in 159 ms. (kafka.log.LogManager)
[2018-11-22 07:48:26,309] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2018-11-22 07:48:26,311] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2018-11-22 07:48:26,608] INFO Awaiting socket connections on 0.0.0.0:29092. (kafka.network.Acceptor)
[2018-11-22 07:48:26,640] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2018-11-22 07:48:26,652] INFO [SocketServer brokerId=0] Started 2 acceptor threads (kafka.network.SocketServer)
[2018-11-22 07:48:26,678] INFO [ExpirationReaper-0-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-11-22 07:48:26,680] INFO [ExpirationReaper-0-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-11-22 07:48:26,686] INFO [ExpirationReaper-0-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-11-22 07:48:26,697] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
[2018-11-22 07:48:26,746] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.zk.KafkaZkClient)
[2018-11-22 07:48:26,750] INFO Result of znode creation at /brokers/ids/0 is: OK (kafka.zk.KafkaZkClient)
[2018-11-22 07:48:26,751] INFO Registered broker 0 at path /brokers/ids/0 with addresses: ArrayBuffer(EndPoint(ec2-18-219-76-229.us-east-2.compute.amazonaws.com,9092,ListenerName(EXTERNAL_PLAINTEXT),PLAINTEXT), EndPoint(kafka-0.broker.ashokleyland.svc.cluster.local,29092,ListenerName(INTERNAL_PLAINTEXT),PLAINTEXT)) (kafka.zk.KafkaZkClient)
[2018-11-22 07:48:26,845] INFO [ExpirationReaper-0-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-11-22 07:48:26,847] INFO [ExpirationReaper-0-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-11-22 07:48:26,849] INFO Creating /controller (is it secure? false) (kafka.zk.KafkaZkClient)
[2018-11-22 07:48:26,856] INFO Result of znode creation at /controller is: OK (kafka.zk.KafkaZkClient)
[2018-11-22 07:48:26,872] INFO [ExpirationReaper-0-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2018-11-22 07:48:26,893] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2018-11-22 07:48:26,913] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2018-11-22 07:48:26,922] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 9 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-11-22 07:48:26,938] INFO [ProducerId Manager 0]: Acquired new producerId block (brokerId:0,blockStartProducerId:3000,blockEndProducerId:3999) by writing to Zk with path version 4 (kafka.coordinator.transaction.ProducerIdManager)
[2018-11-22 07:48:26,991] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2018-11-22 07:48:26,998] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2018-11-22 07:48:26,998] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2018-11-22 07:48:27,111] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2018-11-22 07:48:27,139] INFO [SocketServer brokerId=0] Started processors for 2 acceptors (kafka.network.SocketServer)
[2018-11-22 07:48:27,153] INFO Kafka version : 2.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2018-11-22 07:48:27,153] INFO Kafka commitId : 3402a8361b734732 (org.apache.kafka.common.utils.AppInfoParser)
[2018-11-22 07:48:27,158] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2018-11-22 07:48:27,224] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions testing-5,testing-0,testing-2,testing-1,testing-3,testing-4 (kafka.server.ReplicaFetcherManager)
[2018-11-22 07:48:27,237] INFO Replica loaded for partition testing-5 with initial high watermark 0 (kafka.cluster.Replica)
[2018-11-22 07:48:27,240] INFO [Partition testing-5 broker=0] testing-5 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-11-22 07:48:27,256] INFO Replica loaded for partition testing-2 with initial high watermark 0 (kafka.cluster.Replica)
[2018-11-22 07:48:27,256] INFO [Partition testing-2 broker=0] testing-2 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-11-22 07:48:27,258] INFO Replica loaded for partition testing-3 with initial high watermark 0 (kafka.cluster.Replica)
[2018-11-22 07:48:27,259] INFO [Partition testing-3 broker=0] testing-3 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-11-22 07:48:27,262] INFO Replica loaded for partition testing-0 with initial high watermark 0 (kafka.cluster.Replica)
[2018-11-22 07:48:27,264] INFO [Partition testing-0 broker=0] testing-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-11-22 07:48:27,267] INFO Replica loaded for partition testing-4 with initial high watermark 0 (kafka.cluster.Replica)
[2018-11-22 07:48:27,267] INFO [Partition testing-4 broker=0] testing-4 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-11-22 07:48:27,269] INFO Replica loaded for partition testing-1 with initial high watermark 0 (kafka.cluster.Replica)
[2018-11-22 07:48:27,269] INFO [Partition testing-1 broker=0] testing-1 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-11-22 07:48:27,282] INFO [ReplicaAlterLogDirsManager on broker 0] Added fetcher for partitions List() (kafka.server.ReplicaAlterLogDirsManager)
[2018-11-22 07:58:26,894] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

I also tried forsy creating a Load balancer service and then using its DNS name as an advertised.listener. Then too it would throw the same error.

If anybody could help me, it would be really great. Kindly let me know if more details are required.

Thanks!

@solsson
Copy link
Contributor

solsson commented Nov 22, 2018

@ridicule777 Does this differ from the other cases of name resolution outside cluster that have been discussed in this repo several times? Clients must be able to access all kafka brokers on individual listener addresses known to kafka.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests