Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consume JSON string for Cassandra sink connector #1487

Closed
camwardy opened this issue Dec 9, 2022 · 5 comments
Closed

Consume JSON string for Cassandra sink connector #1487

camwardy opened this issue Dec 9, 2022 · 5 comments

Comments

@camwardy
Copy link

camwardy commented Dec 9, 2022

Hello,

I'm trying to get the Cassandra sink connector to work with a JSON string produced to a Kafka topic. I've managed to get the connector to add rows to my table when producing the Kafka message:

["1","John",1670428382089]

But would instead like to produce messages on to the topic in the following format:

{
    "id" : "1",
    "name" : "John",
    "created_at" : 1670428382089
}

I've attempted to use value.converter=org.apache.kafka.connect.json.JsonConverter in my sink properties, but receive the exception: NoTypeConversionAvailableException: No type converter available to convert from type: java.util.HashMap to the required type: java.io.InputStream.

Is it possible to achieve this with the Camel Cassandra sink connector?


Cassandra table:

USE connect;
CREATE TABLE person (
    id          TEXT PRIMARY KEY, 
    name        TEXT, 
    created_at  TIMESTAMP
);

Sink properties:

name=person-cassandra-sink-connector
topics=person
tasks.max=1
connector.class=org.apache.camel.kafkaconnector.cassandrasink.CamelCassandrasinkSinkConnector
value.converter=org.apache.kafka.connect.storage.StringConverter

camel.kamelet.cassandra-sink.connectionHost=cassandra
camel.kamelet.cassandra-sink.connectionPort=9042
camel.kamelet.cassandra-sink.keyspace=connect
camel.kamelet.cassandra-sink.query=insert into person(id, name, created_at) values (?, ?, ?)
camel.kamelet.cassandra-sink.prepareStatements=false
@aozmen121
Copy link

We're also getting a similar issue as above. @oscerd & @orpiske Could you please see if we're doing something wrong?

@orpiske
Copy link
Contributor

orpiske commented Dec 9, 2022

We're also getting a similar issue as above. @oscerd & @orpiske Could you please see if we're doing something wrong?

Folks, I won't be able to look too much into this today as I am in PTO, but ... by the way it sounds, it seems like something you would need to use a Single Message Transformation (SMT) to do so.

Have you tried using that?

@orpiske
Copy link
Contributor

orpiske commented Dec 9, 2022

More specifically: maybe you want to implement a custom SMT to do so.

@camwardy
Copy link
Author

camwardy commented Dec 9, 2022

@oscerd @orpiske Thanks for the quick reply whilst on PTO. Using a custom SMT was going to be my last resort, I was just wondering if this connector implemented something similar to the Datastax connector, which can map JSON fields to Cassandra columns (https://docs.datastax.com/en/kafka/doc/kafka/kafkaStringJson.html) ?

@camwardy
Copy link
Author

We reverted to implementing our own SMT that did the trick in the end, thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants