Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue with camel-google-pubsub-kafka-connector sink configuration #1209

Closed
simplifadmin opened this issue Jun 14, 2021 · 35 comments
Closed

Issue with camel-google-pubsub-kafka-connector sink configuration #1209

simplifadmin opened this issue Jun 14, 2021 · 35 comments

Comments

@simplifadmin
Copy link

simplifadmin commented Jun 14, 2021

Issue when using camel-google-pubsub-kafka-connector as sink for Strimzi kafka.

I've created a KafkaConnect single-node cluster using camel kafka connect for Pubsub.

That works fine.

Please find the k8s files and the error msg.

Any idea of resolving the google-pubsub-kafka-connector for k8s?

K8s file for KafkaConnect

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
  namespace: kafka
  annotations:
    # enable connect operator
    strimzi.io/use-connector-resources: "true"
spec:
  replicas: 1
  #version: 2.6.0
  image: camel/my-connect:v1
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  resources:
    requests:
      cpu: 250m
      memory: 512Mi
    limits:
      cpu: 500m
      memory: 1Gi
  jvmOptions:
    gcLoggingEnabled: false
  config:
    group.id: my-connect
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.storage.StringConverter
    offset.storage.topic: my-connect-offsets
    config.storage.topic: my-connect-configs
    status.storage.topic: my-connect-status
    config.storage.replication.factor: 3 #n for production per n brokers
    offset.storage.replication.factor: 3 #per broker
    status.storage.replication.factor: 3 #per broker

Issue with pubsub-kafka

kind: KafkaConnector
apiVersion: kafka.strimzi.io/v1beta2
metadata:
  name: pubsub-kafka
  namespace: kafka
  labels:
    # must match connect cluster name
    strimzi.io/cluster: my-connect
spec:
  tasksMax: 1
  class: org.apache.camel.kafkaconnector.googlepubsub.CamelGooglepubsubSinkConnector
  config:
    #The Google Cloud PubSub Project Id
    camel.sink.path.projectId: <Project Id>
    #The Destination Name. For the consumer this will be the subscription name, while for the producer this will be the topic name.
    camel.sink.path.destinationName: test-topic

error msg from KafkaConnect container
2021-06-14 13:18:35,037 ERROR Uncaught exception in REST call to /connectors/pubsub-kafka/config (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper) [qtp977975287-18]
org.apache.kafka.common.config.ConfigException: Must configure one of topics or topics.regex
at org.apache.kafka.connect.runtime.SinkConnectorConfig.validate(SinkConnectorConfig.java:102)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:352)
at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$1(AbstractHerder.java:326)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

``

@oscerd
Copy link
Contributor

oscerd commented Jun 14, 2021

You didn't specify the projectid for google cloud pubsub service, also you didn't specify the credentials for accessing the cloud service. Please read the documentation and have a look at this example.

@simplifadmin
Copy link
Author

The removed the projectID due to expose.

@simplifadmin
Copy link
Author

simplifadmin commented Jun 14, 2021

Update kafka-pubsub

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: pubsub-kafka
  namespace: kafka
  labels:
  # must match connect cluster name
    strimzi.io/cluster: my-connect
spec:
  class: org.apache.camel.kafkaconnector.googlepubsub.CamelGooglepubsubSinkConnector
  tasksMax: 5
  config:
  #The Google Cloud PubSub Project Id
    camel.sink.path.projectId: test
    topics: test-topic
  #The Destination Name. For the consumer this will be the subscription name, while for the producer this will be the topic name.
    camel.sink.path.destinationName: test-topic

Kafka-connect : output

2021-06-14 13:47:59,095 INFO [Consumer clientId=connector-consumer-pubsub-kafka-0, groupId=connect-pubsub-kafka] Member connector-consumer-pubsub-kafka-0-921dcb28-9357-4ac4-a767-42898b659fdf sending LeaveGroup request to coordinator my-cluster-kafka-1.my-cluster-kafka-brokers.kafka.svc:9092 (id: 2147483646 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-pubsub-kafka-0]

@oscerd
Copy link
Contributor

oscerd commented Jun 14, 2021

You need to provide credentials. Also please report the version of the connector you're using.

@simplifadmin
Copy link
Author

version: latest (0.7.X)

@oscerd
Copy link
Contributor

oscerd commented Jun 14, 2021

The version is latest or 0.7.x.

What is the version of the jar you're installing and how are you installing the jar

@simplifadmin
Copy link
Author

output from kafkaconnect:
Caused by: io.grpc.StatusRuntimeException: PERMISSION_DENIED: Request had insufficient authentication scopes.
at io.grpc.Status.asRuntimeException(Status.java:533)

@simplifadmin
Copy link
Author

The version is latest or 0.7.x.

What is the version of the jar you're installing and how are you installing the jar

I'm created GKE using 1.20 version ,not sure on the jar

@oscerd
Copy link
Contributor

oscerd commented Jun 14, 2021

The version of the connector.

@simplifadmin
Copy link
Author

simplifadmin commented Jun 14, 2021

version the latest

@simplifadmin
Copy link
Author

The new look for authentication - Please this the way to implement serviceAccountKey?


apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: pubsub-kafka
  namespace: kafka
  labels:
  # must match connect cluster name
    strimzi.io/cluster: my-connect
spec:
  class: org.apache.camel.kafkaconnector.googlepubsub.CamelGooglepubsubSinkConnector
  tasksMax: 5
  config:
  #The Google Cloud PubSub Project Id
    camel.sink.path.projectId: <projectID>
    topics: test-topic
  #The Destination Name. For the consumer this will be the subscription name, while for the producer this will be the topic name.
    camel.sink.path.destinationName: projects/project/topics/test-topic
  #The Service account key that can be used as credentials for the PubSub publisher/subscriber. 
    camel.component.google-pubsub.serviceAccountKey: "{{key.json}}"
integration:
    configuration:
    - type: "secret"
      value: "pubsub-key"

@oscerd
Copy link
Contributor

oscerd commented Jun 15, 2021

No, you need to provide a file a pass the path to the service account key file. You can get the file from the google pubsub console.

@simplifadmin
Copy link
Author

simplifadmin commented Jun 15, 2021

Here's an update

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: pubsub-kafka
  namespace: kafka
  labels:
  # must match connect cluster name
    strimzi.io/cluster: my-connect
spec:
  class: org.apache.camel.kafkaconnector.googlepubsub.CamelGooglepubsubSinkConnector
  tasksMax: 5
  config:
  #The Google Cloud PubSub Project Id
    camel.sink.path.projectId: projected
    topics: test-topic
  #The Destination Name. For the consumer this will be the subscription name, while for the producer this will be the topic name.
    camel.sink.path.destinationName: projects/senseviewapi/topics/test-topic
  #The Service account key that can be used as credentials for the PubSub publisher/subscriber. 
    camel.component.google-pubsub.serviceAccountKey: ${file:/path-of-the-googlepubsub-key.json}

@oscerd
Copy link
Contributor

oscerd commented Jun 15, 2021

It shoud be the minimal configuration.

@simplifadmin
Copy link
Author

Please what do you mean by minimal configuration?

@oscerd
Copy link
Contributor

oscerd commented Jun 15, 2021

With these parameters configured correctly it should work.

@simplifadmin
Copy link
Author

Issue with Kafka-connector:

ERROR WorkerSinkTask{id=pubsub-kafka-1} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-pubsub-kafka-1]
2021-06-15 15:40:13,911 INFO Stopping CamelSinkTask connector task (org.apache.camel.kafkaconnector.CamelSinkTask) [task-thread-pubsub-kafka-1]
2021-06-15 15:40:13,911 INFO CamelSinkTask connector task stopped (org.apache.camel.kafkaconnector.CamelSinkTask) [task-thread-pubsub-kafka-1]
2021-06-15 15:40:13,921 ERROR WorkerSinkTask{id=pubsub-kafka-2} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-pubsub-kafka-2]

Caused by: org.apache.camel.PropertyBindingException: Error binding property (camel.component.google-pubsub.serviceAccountKey=${file:/path-of-file/gcp_keys/pubsub-key.json}) with name: serviceAccountKey on bean: org.apache.camel.component.google.pubsub.GooglePubsubComponent@189 with value: ${file:/path-of-file/gcp_keys/pubsub-key.js

@oscerd
Copy link
Contributor

oscerd commented Jun 15, 2021

This needs to be tested first in Ckc. I don't think there is the need of brackets there. It's a camel configuration option not Kafka connect.

@simplifadmin
Copy link
Author

simplifadmin commented Jun 17, 2021

I'm having no joy in getting the key to work.
Any ideas?
camel.component.google-pubsub.serviceAccountKey:

Caused by: org.apache.camel.PropertyBindingException: Error binding property (camel.component.google-pubsub.serviceAccountKey=file:/opt/kafka/external-configuration/key.json

@qafro1
Copy link

qafro1 commented Jun 29, 2021

Please any update on this ?

@oscerd
Copy link
Contributor

oscerd commented Jun 29, 2021

No news at the moment. Please try it locally and place the file on your file system.

@qafro1
Copy link

qafro1 commented Jun 30, 2021

I've tested this locally.
$bin/connect-standalone.sh config/connect-standalone.properties /opt/kafka/connectors/docs/examples/CamelGooglepubsubSinkConnector.properties
but get an error msg:

Caused by: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: One or more messages in the publish request is empty. Each message must contain either non-empty data, or at least one attribute.
        at io.grpc.Status.asRuntimeException(Status.java:533)
        ... 13 more
[2021-06-30 17:16:10,722] INFO Stopping CamelSinkTask connector task (org.apache.camel.kafkaconnector.CamelSinkTask:210)
[2021-06-30 17:16:10,723] INFO Apache Camel 3.11.0-SNAPSHOT (camel-1) shutting down (timeout:45s) (org.apache.camel.impl.engine.AbstractCamelContext:3190)
[2021-06-30 17:16:10,729] INFO Routes shutdown summary (total:1 stopped:1) (org.apache.camel.impl.engine.AbstractCamelContext:3389)
[2021-06-30 17:16:10,729] INFO     Stopped route1 (direct://start) (org.apache.camel.impl.engine.AbstractCamelContext:3394)
[2021-06-30 17:16:10,748] INFO Apache Camel 3.11.0-SNAPSHOT (camel-1) shutdown in 24ms (uptime:3s559ms) (org.apache.camel.impl.engine.AbstractCamelContext:3330)
[2021-06-30 17:16:10,748] INFO CamelSinkTask connector task stopped (org.apache.camel.kafkaconnector.CamelSinkTask:224)
[2021-06-30 17:16:10,748] INFO [Consumer clientId=connector-consumer-CamelGoogle-pubsubSinkConnector-0, groupId=connect-CamelGoogle-pubsubSinkConnector] Revoke previously assigned partitions test-topic-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:307)

@oscerd
Copy link
Contributor

oscerd commented Jul 1, 2021

What do you have in the kafka topic and what message are you trying to send to pubsub? It seems you have empty lines.

@qafro1
Copy link

qafro1 commented Jul 1, 2021

Yes - Created Kafka topic and sent msgs like Hello world - however GCP Kafka connector works.
Please any ideas or can you try it yourself?

@oscerd
Copy link
Contributor

oscerd commented Jul 1, 2021

I'll try to have a look, but not soon. The other connector is not based on camel..

@qafro1
Copy link

qafro1 commented Jul 8, 2021

kind: KafkaConnector
apiVersion: kafka.strimzi.io/v1beta2
metadata:
  name: pubsub
  namespace: kafka
  labels:
    # must match connect cluster name
    strimzi.io/cluster: my-connect-cluster
spec:
  tasksMax: 5
  class: org.apache.camel.kafkaconnector.googlepubsub.CamelGooglepubsubSinkConnector
  
  # use the kafka converters that better suit your needs, these are just defaults:
  #key.converter: org.apache.kafka.connect.storage.StringConverter
  #value.converter: org.apache.kafka.connect.storage.StringConverter
  config:
    # comma separated topics to get messages from
    topics: test-topic
    value.converter.schemas.enable: false
    # mandatory properties (for a complete properties list see the connector documentation):

    # The Service account key that can be used as credentials for the PubSub publisher/subscriber. It can be loaded by default from classpath, but you can prefix with classpath:, file:, or http: to load the resource from different systems.
    camel.component.google-pubsub.serviceAccountKey: ${directory:/opt/kafka/external-configuration/google:key.json}
    # The Service account key that can be used as credentials for the PubSub publisher/subscriber. It can be loaded by default from classpath, but you can prefix with classpath:, file:, or http: to load the resource from different systems.
    camel.sink.endpoint.serviceAccountKey: ${directory:/opt/kafka/external-configuration/google:key.json}
    # The Destination Name. For the consumer this will be the subscription name, while for the producer this will be the topic name.
    camel.sink.path.destinationName: test-topic

Issue with Kafkaconnector version 0.10.1 - Any idea?
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

@qafro1
Copy link

qafro1 commented Jul 9, 2021

KafkaConnector name: pubsub issue after sending datastreams:

name: pubsub
    tasks:
      - id: 0
        state: FAILED
        trace: "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:609)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Exchange delivery has failed!\n\tat org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:199)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)\n\t... 10 more\nCaused by: org.apache.camel.ResolveEndpointFailedException: Failed to resolve endpoint: google-pubsub

@qafro1
Copy link

qafro1 commented Jul 12, 2021

Please any assist on this?

@oscerd
Copy link
Contributor

oscerd commented Jul 12, 2021

Don't push so much on contributors. When I'll have time, I'll have a look, if someone else from the community want to help, he will do it.

@oscerd
Copy link
Contributor

oscerd commented Jul 12, 2021

What is inside the folder of google-pubsub in the plugin.path folder?

This

Failed to resolve endpoint: google-pubsub

Means there is no component found for the connector.

@qafro1
Copy link

qafro1 commented Jul 13, 2021

What is inside the folder of google-pubsub in the plugin.path folder?

This

Failed to resolve endpoint: google-pubsub

Means there is no component found for the connector.

Kafka connect plugin folder has the following  
pubsub-sink-connector  storage-sink-connector 

@qafro1
Copy link

qafro1 commented Jul 16, 2021

Caused by: java.net.URISyntaxException: Illegal character in opaque part at index 79: google-pubsub:dat:test-topic?serviceAccountKey=type:service_account
project_id:data
private_key_id:cf3a8
private_key:-----BEGINPRIVATEKEY-----MIIEvQI=-----END%20PRIVATE%20KEY-----
client_email:pubsub-app@dat.iam.gserviceaccount.com
client_id:11
auth_uri:https://accounts.google.com/o/oauth2/auth
token_uri:https://oauth2.googleapis.com/token
auth_provider_x509_cert_url:https://www.googleapis.com/oauth2/v1/certs
client_x509_cert_url:https://www.googleapis.com/robot/v1/metadata/x509/p.iam.gserviceaccount.com
	at java.base/java.net.URI$Parser.fail(URI.java:2913)
	at java.base/java.net.URI$Parser.checkChars(URI.java:3084)
	at java.base/java.net.URI$Parser.parse(URI.java:3120)
	at java.base/java.net.URI.<init>(URI.java:600)
	at org.apache.camel.util.URISupport.doComplexNormalizeUri(URISupport.java:592)
	at org.apache.camel.util.URISupport.normalizeUri(URISupport.java:583)
	at org.apache.camel.support.EndpointHelper.normalizeEndpointUri(EndpointHelper.java:154)
	... 29 more

Any ideas?

@oscerd
Copy link
Contributor

oscerd commented Jul 20, 2021

This configuration in a local kafka works fine

For sink connector:

name=CamelGoogle-pubsubSinkConnector
connector.class=org.apache.camel.kafkaconnector.googlepubsub.CamelGooglepubsubSinkConnector
tasks.max=1

# use the kafka converters that better suit your needs, these are just defaults:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

# comma separated topics to get messages from
topics=test-2

# mandatory properties (for a complete properties list see the connector documentation):
# The Service account key that can be used as credentials for the PubSub publisher/subscriber. It can be loaded by default from classpath, but you can prefix with classpath:, file:, or http: to load the resource from different systems.
camel.sink.endpoint.serviceAccountKey=file:/home/oscerd/kafka/demo/serviceaccountkey.json
# The Destination Name. For the consumer this will be the subscription name, while for the producer this will be the topic name.
camel.sink.path.destinationName=test-pubsub
# The Google Cloud PubSub Project Id
camel.sink.path.projectId=test-pubsub-320411

As you may see I'm pointing a file locally to my filesystem with an absolute path.

Same story for a source connector

name=CamelGoogle-pubsubSourceConnector
connector.class=org.apache.camel.kafkaconnector.googlepubsub.CamelGooglepubsubSourceConnector
tasks.max=1

# use the kafka converters that better suit your needs, these are just defaults:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter= org.apache.kafka.connect.converters.ByteArrayConverter

# comma separated topics to send messages into
topics=test-1

# mandatory properties (for a complete properties list see the connector documentation):

camel.source.endpoint.serviceAccountKey=file:/home/oscerd/kafka/demo/serviceaccountkey.json
# The Destination Name. For the consumer this will be the subscription name, while for the producer this will be the topic name.
camel.source.path.destinationName=test-pubsub-sub
# The Google Cloud PubSub Project Id
camel.source.path.projectId=test-pubsub-320411

@oscerd
Copy link
Contributor

oscerd commented Jul 20, 2021

I cannot help more than this.

@oscerd oscerd closed this as completed Jul 20, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants