Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compatibility with Elasticsearch version 7 #314

Closed
HBO2 opened this issue Jun 6, 2019 · 16 comments
Assignees

Comments

@HBO2
Copy link

@HBO2 HBO2 commented Jun 6, 2019

I am not totally sure but according to my observations in error log of the kafka ES sink connector, the connector is not compatible with Elasticsearch version 7.1. This because of the required "type.name" in the configuration of the connector. This "type.name" parameter is not compatible with the ES 7.1 the incompatibility is related to: https://www.elastic.co/guide/en/elasticsearch/reference/master/removal-of-types.html

UPDATE I can solve it by using "schema.ignore:true", but that gives me in some topics errors, like

Bulk request failed: [{"type":"mapper_parsing_exception","reason":"failed to parse field [amount] of type [float] in document with id 'project_budget_contributor+0+54677'","caused_by":{"type":"number_format_exception","reason":"For input string: \"AmIL\""}}]

so, it seems the the ES sink connector is sending string which is not recognized as number. A decimal like 5094.61 is sent as "B8YV" and 5094.60 is sent as "B8YU"
It looks a bit like the issue about numeric.mapping doesn't work for DECIMAL fields:
confluentinc/kafka-connect-jdbc#563

What happened

Step A:
Before upgrade ES 7.1 and running a an ES 6.8 cluster with 60 indices coming form confluent ES sink kafka connector with the following configuration:

connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
type.name=ainsource
transforms.timestamp.timestamp.field=timestamp
tasks.max=1
topics=contract_payments
transforms=unwrap,key,timestamp
key.ignore=true
transforms.timestamp.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.key.field=id
transforms.key.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope
connection.url=http://elasticsearch-master:9200

Created in ES 6.8 a index _template/kafkakconnect:

{
  "kafkaconnect" : {
    "order" : 0,
    "index_patterns" : [
      "person*",
      "organisation*",
      "mbo*",
      "project*",
      "system*",
      "user*",
      "talend",
      "travel*",
      "postal*",
      "notification*",
      "mobilised*",
      "memo",
      "expert*",
      "donor*",
      "contract*",
      "advisory*",
      "address",
      "comment*",
      "contact*",
      "emergency*",
      "file",
      "goal",
      "intake*",
      "language*",
      "light*",
      "linked*"
    ],
    "settings" : {
      "index" : {
        "number_of_shards" : "1",
        "number_of_replicas" : "1"
      }
    },
    "mappings" : {
      "ainsource" : {
        "dynamic_templates" : [
          {
            "dates" : {
              "mapping" : {
                "type" : "date"
              },
              "match" : "timestamp"
            }
          },
          {
            "created" : {
              "mapping" : {
                "type" : "date"
              },
              "match" : "created_at"
            }
          },
          {
            "modified" : {
              "mapping" : {
                "type" : "date"
              },
              "match" : "modified_on"
            }
          },
          {
            "otherdates" : {
              "mapping" : {
                "type" : "date"
              },
              "match" : "date_*"
            }
          },
          {
            "duestartenddates" : {
              "mapping" : {
                "type" : "date"
              },
              "match" : "*_date"
            }
          },
          {
            "non_analysed_string_template" : {
              "mapping" : {
                "ignore_above" : 1025,
                "type" : "keyword"
              },
              "match_mapping_type" : "string",
              "match" : "*"
            }
          }
        ]
      }
    },
    "aliases" : { }
  }
}

Everything worked fine.

Step B:
Followed the instruction in ES 6.8 to check if it's possible to upgrade. After this I did the upgrade and everything seems to work. Because the"old" indices still had the type.name "ainsource" .

Step C:
When new values where steaming in from ES sink connector with above described configuration every topic with new values stopped working with the same error:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot create mapping {"ainsource":{"properties":{"id":{"type":"integer"},"contract_id":{"type":"integer"},"title":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"amount":{"type":"double"},"status":{"type":"integer"},"date_planned":{"type":"long"},"date_receipt":{"type":"long"},"date_request":{"type":"long"},"modified_by":{"type":"integer"},"modified_on":{"type":"long"},"code":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"date_paid":{"type":"long"},"amount_paid":{"type":"double"},"is_payment_imported":{"type":"short"},"timestamp":{"type":"date"}}}} -- {"root_cause":[{"type":"illegal_argument_exception","reason":"Types cannot be provided in put mapping requests, unless the include_type_name parameter is set to true."}],"type":"illegal_argument_exception","reason":"Types cannot be provided in put mapping requests, unless the include_type_name parameter is set to true."}
	at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createMapping(JestElasticsearchClient.java:255)
	at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:67)
	at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:260)
	at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:163)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
	... 10 more

Question 1: in case this not a bug, can I assume the current confluent ES Sink connector is not compatible with ES 7.1 and I need to go back to ES 6.8?
Question 2: Are there any plans to make the connector compatible with ES 7.1?

BTW: The people from ES are recommending to do the following:
"In order to not be broken by this change, you should make your application pass include_type_name=true to these API calls, which is a no-op on 6.x, and will tell 7.x to make these APIs behave as they used to do in 6.x. For instance here is an index creation call with this parameter. It works the same way on 6.8 and 7.x."
So, perhaps it would be possible to build put URL with something like
?include_type_name=true

thx!

@lifehacking

This comment has been minimized.

Copy link

@lifehacking lifehacking commented Jun 12, 2019

Any updates on ES 7 + support ?

@ynux

This comment has been minimized.

Copy link

@ynux ynux commented Jun 13, 2019

According to moving-from-types-to-typeless-apis-in-elasticsearch-7-0 , it should work for both 6 and 7 when the connecter passes include_type_name=true

@HBO2

This comment has been minimized.

Copy link
Author

@HBO2 HBO2 commented Jun 17, 2019

@ynux

According to moving-from-types-to-typeless-apis-in-elasticsearch-7-0 , it should work for both 6 and 7 when the connecter passes include_type_name=true

How can I do this? I mean, because "include_type_name=true" is not a valid setting in the connector's config. There is no such setting in the connector config??

@ynux

This comment has been minimized.

Copy link

@ynux ynux commented Jun 17, 2019

Sorry, my comment was misleading, this wasn't a tipp for the users. It is more of a request that someone fluent in Java please add this option to the connector.

@rmoff

This comment has been minimized.

Copy link

@rmoff rmoff commented Jul 1, 2019

Elasticsearch 7 is not yet supported with the Elasticsearch sink connector. We'll update the docs to reflect this clearly, and will work on adding support for ES 7 into the connector.

@rhauch

This comment has been minimized.

Copy link
Member

@rhauch rhauch commented Jul 1, 2019

I'd prefer to avoid adding a configuration for this. We can easily tell the version of Elasticsearch that the connector is talking to, can automatically determine whether to add the ?include_type_name=true in the appropriate requests, as recommended by https://www.elastic.co/guide/en/elasticsearch/reference/master/removal-of-types.html. This would be a short-term fix, since this will work with Elasticsearch 7.x but won't work with Elasticsearch 8.x; the connector relies pretty heavily on mapping types we'll probably have to do a lot more work to support 8.x.

Note that we can detect the version information and, more importantly, the minimum_wire_compatibility_version and minimum_index_compatibility_version fields:

$ curl -s http://localhost:9200 | jq '.version'
{
  "number": "6.7.0",
  "build_flavor": "default",
  "build_type": "docker",
  "build_hash": "8453f77",
  "build_date": "2019-03-21T15:32:29.844721Z",
  "build_snapshot": false,
  "lucene_version": "7.7.0",
  "minimum_wire_compatibility_version": "5.6.0",
  "minimum_index_compatibility_version": "5.0.0"
}
@yotov

This comment has been minimized.

Copy link

@yotov yotov commented Jul 2, 2019

The removed _type field is one of the problems. They also renamed _version field https://www.elastic.co/guide/en/elasticsearch/reference/7.x/breaking-changes-7.0.html#_camel_case_and_underscore_parameters_deprecated_in_6_x_have_been_removed
which is added by jest library when key.ignore is false

@babadofar

This comment has been minimized.

Copy link

@babadofar babadofar commented Aug 14, 2019

This case seems to be fixed by the above PRs, but I'm not totally sure where to find out if there has been a release? Would this be backported to previous versions of confluent?

@strigona-worksight

This comment has been minimized.

Copy link

@strigona-worksight strigona-worksight commented Aug 14, 2019

Following the flow a bit, it looks like it was merged into branch 5.1.x which was subsequently merged into 5.2.x, 5.3.x and 5.4.x. So any release after July 31st should have it. So far it looks like there are only beta and RC releases. It would be nice to have a timeframe for when this is expected to land in a stable release.

@dkirrane

This comment has been minimized.

Copy link

@dkirrane dkirrane commented Aug 29, 2019

Any idea how to update the cp-docker-image
https://github.com/confluentinc/cp-docker-images/blob/5.3.0-post/debian/kafka-connect/Dockerfile#L33

I built the kafka connect Dockerfile on both the 5.3.0-post and 5.3.x branches and it seems to be always downloading the old version

Unpacking confluent-kafka-connect-elasticsearch (5.1.2-1)
@strigona-worksight

This comment has been minimized.

Copy link

@strigona-worksight strigona-worksight commented Aug 29, 2019

@dkirrane I don't have an answer for that issue, but we did play around with 5.2.3 which was released 6 days ago and it works with ES 7.

@Avi-Levi

This comment has been minimized.

Copy link

@Avi-Levi Avi-Levi commented Sep 8, 2019

Hi guys,
any info regarding when\which confluent version will include this fix?

@hasmizal

This comment has been minimized.

Copy link

@hasmizal hasmizal commented Sep 23, 2019

Not sure whether this is the right thing to do. But just to share, by making type.name=_doc, I managed to index my data to ES 7.3.1 which previously failed when type.name=kafka-connect.

@NathanNam

This comment has been minimized.

Copy link

@NathanNam NathanNam commented Oct 1, 2019

ES 7 support has been released with CP 5.3.1. https://docs.confluent.io/5.3.1/release-notes/index.html#kafka-connect

@NathanNam NathanNam closed this Oct 1, 2019
@rmoff

This comment has been minimized.

Copy link

@rmoff rmoff commented Oct 1, 2019

@NathanNam see #342 - I can't get it to work, using 5.3.1.

@rmoff rmoff reopened this Oct 1, 2019
@purbon purbon self-assigned this Oct 4, 2019
@rmoff

This comment has been minimized.

Copy link

@rmoff rmoff commented Oct 7, 2019

This does indeed work, I had made a fubar with my config, see #342

@rmoff rmoff closed this Oct 7, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
You can’t perform that action at this time.