Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConnectException: Cannot create mapping when using RegexRouter/TimestampRouter SMT #99

Closed
rmoff opened this issue Jul 12, 2017 · 18 comments

Comments

@rmoff
Copy link

rmoff commented Jul 12, 2017

Following config works fine

{
  "config": {
  "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://localhost:8081",
  "value.converter.schema.registry.url": "http://localhost:8081",
  "topics": "sqlite-foo",
  "key.ignore": "true",
  "type.name": "type.name=kafka-connect",
  "connection.url": "http://localhost:9200"
  },
  "name": "es-sink-sqlite-foo"
}

But if I add org.apache.kafka.connect.transforms.TimestampRouter SMT:

{
  "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://localhost:8081",
  "value.converter.schema.registry.url": "http://localhost:8081",
  "topics": "sqlite-foo",
  "key.ignore": "true",
  "type.name": "type.name=kafka-connect",
  "connection.url": "http://localhost:9200",
  "transforms":"routeTS",
  "transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter"
}

the Elasticsearch Sink fails:

[2017-07-12 16:06:16,869] ERROR Task es-sink-sqlite-foo-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)
org.apache.kafka.connect.errors.ConnectException: Cannot create mapping {"type.name=kafka-connect":{"properties":{"c1":{"type":"long"},"c2":{"type":"string"},"messagetopic":{"type":"string"},"messagesource":{"type":"string"}}}} -- {"root_cause":[{"type":"index_not_found_exception","reason":"no such index","resource.type":"index_or_alias","resource.id":"sqlite-foo-20170712","index_uuid":"_na_","index":"sqlite-foo-20170712"}],"type":"index_not_found_exception","reason":"no such index","resource.type":"index_or_alias","resource.id":"sqlite-foo-20170712","index_uuid":"_na_","index":"sqlite-foo-20170712"}
        at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:65)
        at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:201)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:119)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
[2017-07-12 16:06:16,871] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:456)

Same problem exists with using RegexRouter e.g.:

        "transforms":"dropPrefix",
        "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.dropPrefix.regex":"sqlite-(.*)",
        "transforms.dropPrefix.replacement":"$1"

It looks like the Sink correctly autocreates indices based on the inbound topic, but if the topic mutates with SMT, it doesn't.

@rmoff rmoff changed the title Elasticsearch sink - ConnectException: Cannot create mapping when using TimestampRouter SMT Elasticsearch sink - ConnectException: Cannot create mapping when using RegexRouter/TimestampRouter SMT Jul 12, 2017
@rmoff rmoff changed the title Elasticsearch sink - ConnectException: Cannot create mapping when using RegexRouter/TimestampRouter SMT ConnectException: Cannot create mapping when using RegexRouter/TimestampRouter SMT Jul 12, 2017
@kkonstantine
Copy link
Member

Probably related to: https://issues.apache.org/jira/browse/KAFKA-5567
Can you try with Kafka Connect workers from Apache Kafka trunk repo?

@perryao
Copy link

perryao commented Mar 23, 2018

is there a workaround for this?

@jay999123
Copy link

Can anyone comment on what is the workaround for this issue? or when can this will be resolved?

@rmoff
Copy link
Author

rmoff commented Aug 22, 2018

This works fine now as of Apache Kafka 2.0 / Confluent Platform 5.0

@mattferroni
Copy link

Hey @rmoff , same problem here :(
Any suggestion on how to handle it with a previous Kafka deploy?

@rmoff
Copy link
Author

rmoff commented Aug 30, 2018

@mattferroni which version are you running? The options as I see it for you are:

  • Create the target Elasticsearch indices another way (manually, or scripted)
  • Upgrade your Kafka Connect version

@mattferroni
Copy link

Running Kafka 1.0, I finally scripted it (manually) -- thanks anyway!

@gunnarmorling
Copy link

gunnarmorling commented Oct 11, 2018

FWIW, I'm still seeing this issue with Kafka 2.0 and kafka-connect-elasticsearch-5.0.0.jar. I'd like to apply RegexRouter, but it seems to be ignored. In my case that's causing things to not work at all, as the inbound topic name isn't a valid index name:

Invalid index name [dbserver1.inventory.Hike], must be lowercase\",\"index_uuid\":\"_na_\",\"index\":\"dbserver1.inventory.Hike\"} at 
io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createIndices(JestElasticsearchClient.java:238) at 
io.confluent.connect.elasticsearch.ElasticsearchWriter.createIndicesForTopics(ElasticsearchWriter.java:330) at 
io.confluent.connect.elasticsearch.ElasticsearchSinkTask.open(ElasticsearchSinkTask.java:157)

@rmoff
Copy link
Author

rmoff commented Oct 11, 2018

Hey @gunnarmorling :) Can you share your full sink config?

@gunnarmorling
Copy link

gunnarmorling commented Oct 11, 2018

@rmoff, of course, here it is:

{
    "name": "elastic-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "Customers",
        "connection.url": "http://elastic:9200",
        "transforms": "unwrap,key,route",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.key.field": "id",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "Customers",
        "transforms.route.replacement": "customers",
        "key.ignore": "false",
        "type.name": "customer"
    }
}

I have also an executable reproducer, if you'd like to take a look: https://github.com/gunnarmorling/debezium-examples/tree/kafka-connect-elasticsearch-99/unwrap-smt

Check out that branch and follow the steps in https://github.com/gunnarmorling/debezium-examples/blob/kafka-connect-elasticsearch-99/unwrap-smt/reproducer.md. Registering the sink connector fails due to an upper-case index name, which should have been transformed by means of the regex router. Just verified it's using Kafka 2.0 and ES sink 5.0.0.

Maybe I'm also doing some silly mistake here?

@rmoff
Copy link
Author

rmoff commented Oct 12, 2018

@gunnarmorling I reproduced your problem using the steps provided.

However, I think it's different from the original problem in this ticket. Whereas before, Kafka Connect was applying the RegEx SMT but not creating the target ES index, what seems to be happening in your case is that it's simply not applying the RegEx SMT.

You can see this from the Elasticsearch error, where it is trying to use the original topic name in the index Invalid index name [Customers].

The practical step if you just want to force to lowercase is use the topic.index.map config option:

    "topic.index.map":"Customers:customers",

But, it's probably worth raising a separate ticket to address the issue with RegexRouter that you're seeing anyway.

@gunnarmorling
Copy link

@rmoff Thanks for confirming. I'll file a new issue then. Regarding topic.index.map, that's what I'm using for now (albeit it's deprecated).

Btw. I suppose there's a general shortcoming of either approach: it seems not possible to generically lower-case multiple topic names at once, as that's not supported by Java's regex engine AFAIK. That is, if the regex router isn't supporting some extended expression format.

@gunnarmorling
Copy link

Logged #246.

@mingfang
Copy link

mingfang commented Nov 13, 2018

I'm having same problem with RegexRouter.

Due to the way this connect creates indices at startup

I don't think the use of SMT and RegexRouter to dynamically change index name can ever work. Even if you set ES to auto create index, there's still the problem of this connector creating unwanted indices on startup.

The best solution is to use RegexRouter on the source side to ensure the desired ES sink index names. The down side is that the source and sink are now more coupled.

@kamaladevij
Copy link

I am having same problem with TimestampRouter and my sink property is below

Confluent Version 5.0.0 kafka 2.0

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=testmetric5
#topic.schema.ignore=testmertic1
key.ignore=false
connection.url=http://10.113.144.79:30011
type.name=kafka-connect

#Transformations
transforms=testmetric5
transforms.testmetric5.type=org.apache.kafka.connect.transforms.TimestampRouter
transforms.testmetric5.topic.format=testmetric5-${timestamp}
transforms.testmetric5.timestamp.format=YYYYMM

Exception:
[2018-11-19 13:53:18,110] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:584)
org.apache.kafka.connect.errors.ConnectException: Cannot create mapping {"kafka-connect":{"properties":{"f1":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}}}}} -- {"root_cause":[{"type":"index_not_found_exception","reason":"no such index","resource.type":"index_or_alias","resource.id":"testmetric5-201811","index_uuid":"na","index":"testmetric5-201811"}],"type":"index_not_found_exception","reason":"no such index","resource.type":"index_or_alias","resource.id":"testmetric5-201811","index_uuid":"na","index":"testmetric5-201811"}
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createMapping(JestElasticsearchClient.java:255)
at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:67)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:260)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:163)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2018-11-19 13:53:18,111] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot create mapping {"kafka-connect":{"properties":{"f1":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}}}}} -- {"root_cause":[{"type":"index_not_found_exception","reason":"no such index","resource.type":"index_or_alias","resource.id":"testmetric5-201811","index_uuid":"na","index":"testmetric5-201811"}],"type":"index_not_found_exception","reason":"no such index","resource.type":"index_or_alias","resource.id":"testmetric5-201811","index_uuid":"na","index":"testmetric5-201811"}
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createMapping(JestElasticsearchClient.java:255)
at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:67)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:260)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:163)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
... 10 more
[2018-11-19 13:53:18,111] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)

@jrmma
Copy link

jrmma commented Feb 22, 2019

Hi Robin,
I'm facing a similar error without using transforms in sink config.

Confluent version: 5.1.2
ES: 6.5.4

source config is able to parse the MySQL successfully(consumer can read)
once i open the sink i get the error

"type\":\"mapper_parsing_exception\",
\"reason\":\"Mapping definition for [columnname] has unsupported parameters:  [null_value : 1970-01-01T00:00:00Z]\"

However, the columnname is specified as timestamp & non null in DB.
Snapshot is successful, error arises on binlog connection+reading.

another table(diff connector) throws out an error as

"type\":\"mapper_parsing_exception\",
\"reason\":\"Mapping definition for [ActionTimestamp] has unsupported parameters:  [null_value : ---]\"

where '---' was specified as the default value for non null column

sink config:

{
  "name": "sinkname",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "connection.url": "http://localhost:9200",
    "type.name": "type.name=kafka-connect",
    "topics": "topicname",
    "key.ignore": "true"
  }
}

Can you please point me in the right direction?

@jrmma
Copy link

jrmma commented Feb 26, 2019

@levzem
Copy link
Contributor

levzem commented Feb 7, 2020

closing because issue was resolved

@levzem levzem closed this as completed Feb 7, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

10 participants